Capstone Project 2: Network Analysis of a Real Information Operation
Project Overview
This project applies social network analysis to real, publicly available data released by social media platforms as part of their transparency initiatives. Since 2018, Twitter (now X), Meta, and other platforms have released datasets documenting the accounts and content associated with state-linked information operations they have identified and removed. These datasets are among the most valuable resources available for studying how coordinated influence campaigns are structured and how they operate — and they are freely downloadable and academically analyzed by researchers around the world.
You will select one of these released datasets, load and process the data, construct multiple types of networks representing different aspects of the operation, apply community detection and temporal analysis methods, develop an attribution assessment, and produce a structured intelligence assessment report.
This project mirrors the actual workflow of researchers at universities, think tanks, and nongovernmental organizations who study influence operations for a living. It is also methodologically adjacent to the work done by platform trust-and-safety teams, national intelligence agencies, and investigative journalism organizations. The analytical skills you develop — network construction, community detection, temporal analysis, linguistic analysis, and structured analytic writing — transfer across all of these professional contexts.
Learning Objectives
By completing this project, you will be able to:
- Locate, download, and load platform transparency data for research purposes
- Construct and visualize account networks, retweet networks, and content similarity networks from raw social media data
- Apply community detection algorithms and interpret the resulting clusters in substantive terms
- Identify temporal signatures of coordinated activity including synchronized posting patterns and amplification cascades
- Perform basic linguistic analysis of content to identify thematic focus, target audiences, and stylistic markers
- Apply a structured analytic methodology to develop an attribution assessment under uncertainty
- Write a professional intelligence assessment report that clearly distinguishes findings from judgments and judgments from speculation
- Reflect critically on the ethical and legal dimensions of influence operation research
Phase 1: Obtaining and Loading Platform Transparency Data
1.1 Available Datasets
The following organizations publish transparency data from removed information operations:
Stanford Internet Observatory / Hoover Institution: The IO Archive at ioarchive.com aggregates datasets from Twitter, Meta, and other platforms. This is the recommended starting point.
Twitter/X Transparency Center: transparency.twitter.com/en/reports/information-operations.html — datasets include account metadata, tweet content, and media files.
Meta Transparency Center: Meta has released data on coordinated inauthentic behavior (CIB) operations in their Threat Reports.
DFRLab (Digital Forensic Research Lab): Publishes detailed case studies with associated data.
For this project, we recommend selecting one of the following well-documented Twitter datasets, which are manageable in size and extensively analyzed in the academic literature (providing validation material for your analysis):
- IRA (Internet Research Agency, Russia) — 2016–2018: The foundational dataset. Three million tweets from approximately 3,000 accounts.
- Iran — 2018: Approximately 770 accounts, 1.1 million tweets, focused on US political discourse and regional topics.
- China — Xinjiang (2019–2020): Approximately 23,000 accounts focused on Xinjiang/Hong Kong topics.
1.2 Data Loading Pipeline
"""
capstone02/data_loader.py
Load and preprocess platform transparency data for network analysis.
"""
import pandas as pd
import numpy as np
import json
import re
import logging
from pathlib import Path
from typing import Optional
from datetime import datetime
import zipfile
import requests
logger = logging.getLogger(__name__)
class TransparencyDataLoader:
"""
Load and standardize platform transparency datasets from Twitter/Meta.
Different releases have slightly different schemas; this class normalizes them.
"""
# Common timestamp formats in Twitter transparency data
TIMESTAMP_FORMATS = [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%SZ",
"%m/%d/%Y %H:%M",
]
def __init__(self, data_dir: Path):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
def load_twitter_accounts(self, filepath: Path) -> pd.DataFrame:
"""
Load account-level metadata from a Twitter transparency CSV.
Standard columns in Twitter transparency account files:
userid, user_display_name, user_screen_name, user_reported_location,
user_profile_description, user_profile_url, follower_count,
following_count, account_creation_date, account_language,
tweet_count, retweet_count, reply_count, account_type,
alt_external_id
"""
df = pd.read_csv(filepath, low_memory=False)
logger.info(
f"Loaded {len(df)} accounts from {filepath.name}. "
f"Columns: {list(df.columns)}"
)
# Standardize column names across different dataset releases
rename_map = {
"userid": "account_id",
"user_screen_name": "username",
"user_display_name": "display_name",
"user_reported_location": "location",
"user_profile_description": "bio",
"follower_count": "followers",
"following_count": "following",
"account_creation_date": "created_at",
"account_language": "lang",
"tweet_count": "tweets",
}
df = df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns})
df["account_id"] = df["account_id"].astype(str)
# Parse timestamps
if "created_at" in df.columns:
df["created_at"] = pd.to_datetime(
df["created_at"], infer_datetime_format=True, errors="coerce"
)
return df
def load_twitter_tweets(
self,
filepath: Path,
chunksize: int = 100000,
) -> pd.DataFrame:
"""
Load tweet-level data from a Twitter transparency CSV.
Large files are loaded in chunks.
Standard tweet columns:
tweetid, userid, tweet_text, tweet_time, retweet_tweetid,
retweet_userid, in_reply_to_tweetid, quoted_tweet_tweetid,
is_retweet, hashtags, urls, user_mentions
"""
chunks = []
for chunk in pd.read_csv(
filepath,
chunksize=chunksize,
low_memory=False
):
chunks.append(chunk)
df = pd.concat(chunks, ignore_index=True)
logger.info(f"Loaded {len(df)} tweets from {filepath.name}")
# Standardize
rename_map = {
"tweetid": "tweet_id",
"userid": "account_id",
"tweet_text": "text",
"tweet_time": "timestamp",
"retweet_tweetid": "retweeted_tweet_id",
"retweet_userid": "retweeted_account_id",
"user_mentions": "mentions",
}
df = df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns})
# Parse timestamps
if "timestamp" in df.columns:
df["timestamp"] = pd.to_datetime(
df["timestamp"], infer_datetime_format=True, errors="coerce"
)
# Parse is_retweet
if "is_retweet" in df.columns:
df["is_retweet"] = df["is_retweet"].map(
{"true": True, "false": False, True: True, False: False}
).fillna(False)
return df
def parse_hashtags(self, hashtag_str: str) -> list:
"""Parse hashtag field (stored as JSON array or comma-separated)."""
if not hashtag_str or pd.isna(hashtag_str):
return []
if isinstance(hashtag_str, str):
# Try JSON first
try:
parsed = json.loads(hashtag_str)
if isinstance(parsed, list):
return [h.lower().strip("#") for h in parsed if h]
except (json.JSONDecodeError, TypeError):
pass
# Fall back to comma-separated
return [h.lower().strip().strip("#")
for h in hashtag_str.split(",") if h.strip()]
return []
def parse_mentions(self, mention_str: str) -> list:
"""Parse user mentions from tweet data."""
if not mention_str or pd.isna(mention_str):
return []
return re.findall(r"@(\w+)", str(mention_str).lower())
def get_dataset_summary(
self,
accounts_df: pd.DataFrame,
tweets_df: pd.DataFrame,
) -> dict:
"""Generate a summary statistics report for the loaded dataset."""
summary = {
"total_accounts": len(accounts_df),
"total_tweets": len(tweets_df),
"retweet_count": tweets_df["is_retweet"].sum() if "is_retweet" in tweets_df.columns else "N/A",
"date_range": {
"earliest": str(tweets_df["timestamp"].min()) if "timestamp" in tweets_df.columns else "N/A",
"latest": str(tweets_df["timestamp"].max()) if "timestamp" in tweets_df.columns else "N/A",
},
"languages": accounts_df["lang"].value_counts().head(10).to_dict() if "lang" in accounts_df.columns else {},
"median_followers": accounts_df["followers"].median() if "followers" in accounts_df.columns else "N/A",
"account_creation_by_year": (
accounts_df["created_at"].dt.year.value_counts().sort_index().to_dict()
if "created_at" in accounts_df.columns else {}
),
}
logger.info(f"Dataset summary: {json.dumps(summary, indent=2, default=str)}")
return summary
Phase 2: Network Construction
"""
capstone02/network_construction.py
Build multiple network representations of the information operation.
"""
import pandas as pd
import numpy as np
import networkx as nx
import logging
from pathlib import Path
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import scipy.sparse as sp
logger = logging.getLogger(__name__)
class AccountNetworkBuilder:
"""
Build account-level networks based on interaction patterns.
Nodes are accounts; edges represent retweet, reply, or mention relationships.
"""
def build_retweet_network(self, tweets_df: pd.DataFrame) -> nx.DiGraph:
"""
Build a directed retweet network.
Edge (A -> B) means account A retweeted account B.
"""
G = nx.DiGraph()
retweets = tweets_df[
tweets_df["is_retweet"] == True
][["account_id", "retweeted_account_id"]].dropna()
for _, row in retweets.iterrows():
src = str(row["account_id"])
tgt = str(row["retweeted_account_id"])
if src and tgt and src != tgt:
if G.has_edge(src, tgt):
G[src][tgt]["weight"] += 1
else:
G.add_edge(src, tgt, weight=1)
logger.info(
f"Retweet network: {G.number_of_nodes()} nodes, "
f"{G.number_of_edges()} edges"
)
return G
def build_mention_network(self, tweets_df: pd.DataFrame) -> nx.DiGraph:
"""
Build a directed mention network.
Edge (A -> B) means account A mentioned @B in a tweet.
"""
G = nx.DiGraph()
if "mentions" not in tweets_df.columns:
logger.warning("No 'mentions' column found")
return G
for _, row in tweets_df.iterrows():
src = str(row["account_id"])
mentions = row.get("mentions", "")
if not mentions or pd.isna(mentions):
continue
mentioned_users = re.findall(r"@(\w+)", str(mentions).lower())
for tgt in mentioned_users:
if tgt and tgt != src.lower():
if G.has_edge(src, tgt):
G[src][tgt]["weight"] += 1
else:
G.add_edge(src, tgt, weight=1)
logger.info(
f"Mention network: {G.number_of_nodes()} nodes, "
f"{G.number_of_edges()} edges"
)
return G
def add_account_attributes(
self,
G: nx.Graph,
accounts_df: pd.DataFrame,
) -> nx.Graph:
"""Add account metadata as node attributes."""
attr_cols = [
"username", "display_name", "followers", "following",
"tweets", "lang", "created_at", "location", "bio"
]
available_cols = [c for c in attr_cols if c in accounts_df.columns]
for _, row in accounts_df.iterrows():
node_id = str(row["account_id"])
if G.has_node(node_id):
attrs = {
col: row[col] for col in available_cols
if not pd.isna(row.get(col, np.nan))
}
G.nodes[node_id].update(attrs)
return G
class HashtagNetworkBuilder:
"""
Build a hashtag co-occurrence network.
Nodes are hashtags; edges represent co-occurrence in the same tweet.
This reveals topical focus and coordination around specific discourse themes.
"""
def build(
self,
tweets_df: pd.DataFrame,
min_edge_weight: int = 5,
min_hashtag_freq: int = 10,
) -> nx.Graph:
"""
Build undirected hashtag co-occurrence network.
Args:
tweets_df: DataFrame with 'hashtags' column
min_edge_weight: Minimum co-occurrences to include an edge
min_hashtag_freq: Minimum total occurrences to include a hashtag
"""
G = nx.Graph()
hashtag_counts = {}
co_occurrence_counts = {}
from data_loader import TransparencyDataLoader
loader = TransparencyDataLoader(Path("."))
for _, row in tweets_df.iterrows():
hashtags = loader.parse_hashtags(str(row.get("hashtags", "")))
hashtags = list(set(hashtags)) # Remove within-tweet duplicates
for ht in hashtags:
hashtag_counts[ht] = hashtag_counts.get(ht, 0) + 1
for i, ht1 in enumerate(hashtags):
for ht2 in hashtags[i+1:]:
key = tuple(sorted([ht1, ht2]))
co_occurrence_counts[key] = co_occurrence_counts.get(key, 0) + 1
# Build network
frequent_hashtags = {
ht for ht, count in hashtag_counts.items()
if count >= min_hashtag_freq
}
for (ht1, ht2), weight in co_occurrence_counts.items():
if (ht1 in frequent_hashtags and ht2 in frequent_hashtags
and weight >= min_edge_weight):
G.add_edge(ht1, ht2, weight=weight)
# Add frequency as node attribute
for ht, count in hashtag_counts.items():
if G.has_node(ht):
G.nodes[ht]["frequency"] = count
logger.info(
f"Hashtag network: {G.number_of_nodes()} nodes, "
f"{G.number_of_edges()} edges"
)
return G
class ContentSimilarityNetworkBuilder:
"""
Build an account network based on content similarity.
Nodes are accounts; edge weight is cosine similarity between
their TF-IDF text representations. High similarity suggests
possible shared authorship or coordinated messaging.
"""
def __init__(
self,
max_features: int = 10000,
similarity_threshold: float = 0.7,
sample_tweets_per_account: int = 100,
):
self.max_features = max_features
self.similarity_threshold = similarity_threshold
self.sample_tweets_per_account = sample_tweets_per_account
def build(
self,
tweets_df: pd.DataFrame,
min_account_tweets: int = 10,
) -> nx.Graph:
"""Build content similarity network."""
# Aggregate tweets per account
account_texts = {}
for account_id, group in tweets_df.groupby("account_id"):
if len(group) >= min_account_tweets:
sample = group["text"].dropna().head(self.sample_tweets_per_account)
account_texts[str(account_id)] = " ".join(sample.tolist())
if len(account_texts) < 2:
logger.warning("Insufficient accounts for similarity network")
return nx.Graph()
account_ids = list(account_texts.keys())
texts = [account_texts[aid] for aid in account_ids]
vectorizer = TfidfVectorizer(
max_features=self.max_features,
min_df=2,
strip_accents="unicode",
)
tfidf_matrix = vectorizer.fit_transform(texts)
# Compute pairwise cosine similarity (in chunks for memory efficiency)
G = nx.Graph()
G.add_nodes_from(account_ids)
chunk_size = 500
for i in range(0, len(account_ids), chunk_size):
chunk_matrix = tfidf_matrix[i:i+chunk_size]
similarities = cosine_similarity(chunk_matrix, tfidf_matrix)
for j_local, similarities_row in enumerate(similarities):
j_global = i + j_local
for k, sim in enumerate(similarities_row):
if k <= j_global:
continue
if sim >= self.similarity_threshold:
G.add_edge(
account_ids[j_global],
account_ids[k],
weight=float(sim),
similarity=float(sim),
)
logger.info(
f"Content similarity network: {G.number_of_nodes()} nodes, "
f"{G.number_of_edges()} edges "
f"(threshold: {self.similarity_threshold})"
)
return G
Phase 3: Community Detection
"""
capstone02/community_detection.py
Identify coordinated clusters within information operation networks.
"""
import networkx as nx
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import logging
from pathlib import Path
from community import community_louvain # python-louvain
from collections import defaultdict
logger = logging.getLogger(__name__)
FIGURE_DIR = Path("figures")
FIGURE_DIR.mkdir(exist_ok=True)
def detect_communities_louvain(
G: nx.Graph,
resolution: float = 1.0,
random_state: int = 42,
) -> dict:
"""
Apply the Louvain algorithm for community detection.
Returns a dict mapping node_id -> community_id.
"""
if G.number_of_edges() == 0:
logger.warning("Graph has no edges — community detection skipped")
return {}
# Convert to undirected if directed
G_undirected = G.to_undirected() if G.is_directed() else G
partition = community_louvain.best_partition(
G_undirected,
resolution=resolution,
random_state=random_state,
)
community_sizes = defaultdict(int)
for community_id in partition.values():
community_sizes[community_id] += 1
n_communities = len(set(partition.values()))
logger.info(
f"Louvain detected {n_communities} communities. "
f"Top 5 sizes: {sorted(community_sizes.values(), reverse=True)[:5]}"
)
modularity = community_louvain.modularity(partition, G_undirected)
logger.info(f"Modularity: {modularity:.4f}")
return partition
def characterize_communities(
G: nx.Graph,
partition: dict,
accounts_df: pd.DataFrame,
tweets_df: pd.DataFrame,
top_n_communities: int = 10,
) -> pd.DataFrame:
"""
Generate a descriptive profile for each detected community.
"""
community_profiles = []
communities = defaultdict(list)
for node, comm_id in partition.items():
communities[comm_id].append(node)
# Focus on largest communities
sorted_communities = sorted(
communities.items(),
key=lambda x: len(x[1]),
reverse=True
)[:top_n_communities]
for comm_id, members in sorted_communities:
member_set = set(str(m) for m in members)
member_tweets = tweets_df[
tweets_df["account_id"].astype(str).isin(member_set)
]
# Account attributes for this community
member_accounts = accounts_df[
accounts_df["account_id"].astype(str).isin(member_set)
]
# Top hashtags in community
all_hashtags = []
for ht_str in member_tweets.get("hashtags", pd.Series()):
all_hashtags.extend([h for h in str(ht_str).split(",") if h.strip()])
from collections import Counter
top_hashtags = [
ht for ht, _ in Counter(all_hashtags).most_common(10)
if ht and ht != "nan"
]
# Subgraph metrics
subgraph = G.subgraph(members)
try:
density = nx.density(subgraph)
except Exception:
density = 0.0
profile = {
"community_id": comm_id,
"size": len(members),
"tweet_count": len(member_tweets),
"avg_followers": member_accounts.get("followers", pd.Series([0])).median(),
"dominant_language": (
member_accounts["lang"].value_counts().index[0]
if "lang" in member_accounts.columns and len(member_accounts) > 0
else "unknown"
),
"top_hashtags": top_hashtags[:5],
"subgraph_density": round(density, 4),
"avg_tweets_per_account": len(member_tweets) / max(len(members), 1),
}
community_profiles.append(profile)
logger.info(f"Community {comm_id}: {profile}")
return pd.DataFrame(community_profiles)
def visualize_network(
G: nx.Graph,
partition: dict,
title: str,
max_nodes: int = 2000,
save_path: Optional[Path] = None,
):
"""
Visualize a network with community coloring.
For large networks, samples a subset of nodes for readability.
"""
import matplotlib.pyplot as plt
# Sample if necessary
if G.number_of_nodes() > max_nodes:
# Keep highest-degree nodes
degrees = dict(G.degree())
top_nodes = sorted(degrees, key=degrees.get, reverse=True)[:max_nodes]
G = G.subgraph(top_nodes)
partition = {k: v for k, v in partition.items() if k in top_nodes}
logger.info(f"Sampled top {max_nodes} nodes by degree for visualization")
fig, ax = plt.subplots(1, 1, figsize=(14, 12))
# Layout
pos = nx.spring_layout(G, k=0.3, seed=42, iterations=50)
# Color nodes by community
communities = sorted(set(partition.values()))
cmap = cm.get_cmap("tab20", len(communities))
color_map = {c: cmap(i) for i, c in enumerate(communities)}
node_colors = [
color_map.get(partition.get(n, -1), (0.5, 0.5, 0.5, 1.0))
for n in G.nodes()
]
# Size nodes by degree
degrees = dict(G.degree())
node_sizes = [max(10, min(300, degrees.get(n, 1) * 5)) for n in G.nodes()]
nx.draw_networkx_nodes(
G, pos,
node_color=node_colors,
node_size=node_sizes,
alpha=0.85,
ax=ax,
)
nx.draw_networkx_edges(
G, pos,
alpha=0.15,
width=0.5,
edge_color="gray",
ax=ax,
)
ax.set_title(title, fontsize=15, fontweight="bold")
ax.axis("off")
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=150, bbox_inches="tight")
logger.info(f"Saved network visualization to {save_path}")
plt.show()
Phase 4: Temporal Analysis
"""
capstone02/temporal_analysis.py
Analyze temporal patterns of coordinated posting behavior.
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
from pathlib import Path
import logging
from scipy import stats
from scipy.signal import find_peaks
logger = logging.getLogger(__name__)
FIGURE_DIR = Path("figures")
class TemporalAnalyzer:
"""
Analyze temporal patterns in information operation data.
Key insight: coordinated operations often show distinctive temporal
signatures — synchronized bursts of activity, unusual hour-of-day
distributions, and correlations between account activity patterns.
"""
def plot_activity_timeline(
self,
tweets_df: pd.DataFrame,
partition: dict = None,
freq: str = "D",
title: str = "Posting Activity Over Time",
) -> pd.DataFrame:
"""
Plot total posting volume over time, optionally broken down by community.
Args:
tweets_df: DataFrame with 'timestamp' column
partition: Optional dict mapping account_id -> community_id
freq: Pandas frequency string ('D'=daily, 'H'=hourly, 'W'=weekly)
"""
df = tweets_df.copy()
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
df = df.dropna(subset=["timestamp"])
fig, axes = plt.subplots(2, 1, figsize=(14, 10))
# Total activity
daily_counts = df.set_index("timestamp").resample(freq).size()
axes[0].fill_between(
daily_counts.index, daily_counts.values,
alpha=0.6, color="#2196F3"
)
axes[0].set_title(title, fontsize=13, fontweight="bold")
axes[0].set_ylabel(f"Tweets per {freq}")
axes[0].xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m"))
# Identify activity peaks
peaks, properties = find_peaks(
daily_counts.values,
height=np.percentile(daily_counts.values, 80),
distance=3,
)
if len(peaks) > 0:
peak_dates = daily_counts.index[peaks]
peak_values = daily_counts.values[peaks]
axes[0].scatter(peak_dates, peak_values, color="red", s=50, zorder=5)
for peak_date, peak_val in zip(peak_dates[:5], peak_values[:5]):
axes[0].annotate(
str(peak_date.date()),
(peak_date, peak_val),
textcoords="offset points",
xytext=(0, 8),
fontsize=8,
ha="center",
)
# Hour-of-day distribution
df["hour"] = df["timestamp"].dt.hour
hour_dist = df.groupby("hour").size()
axes[1].bar(hour_dist.index, hour_dist.values, color="#4CAF50", alpha=0.7)
axes[1].set_xlabel("Hour of Day (UTC)")
axes[1].set_ylabel("Tweet Count")
axes[1].set_title("Hour-of-Day Distribution (Signals Geographic Location)", fontsize=12)
axes[1].set_xticks(range(0, 24))
plt.tight_layout()
path = FIGURE_DIR / "activity_timeline.png"
plt.savefig(path, dpi=150, bbox_inches="tight")
plt.show()
return daily_counts
def detect_coordination_windows(
self,
tweets_df: pd.DataFrame,
window_minutes: int = 5,
min_accounts: int = 10,
) -> pd.DataFrame:
"""
Identify time windows where many accounts posted simultaneously.
Highly synchronized posting is a strong signal of coordination.
Args:
window_minutes: Size of the time window to examine
min_accounts: Minimum number of distinct accounts posting in window
"""
df = tweets_df.copy()
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
df = df.dropna(subset=["timestamp"]).sort_values("timestamp")
coordination_windows = []
# Sliding window approach
window = pd.Timedelta(minutes=window_minutes)
sampled = df.sample(min(len(df), 10000), random_state=42) # Sample for performance
for idx, row in sampled.iterrows():
t0 = row["timestamp"]
t1 = t0 + window
window_tweets = df[
(df["timestamp"] >= t0) &
(df["timestamp"] < t1)
]
unique_accounts = window_tweets["account_id"].nunique()
if unique_accounts >= min_accounts:
coordination_windows.append({
"window_start": t0,
"window_end": t1,
"tweet_count": len(window_tweets),
"unique_accounts": unique_accounts,
"tweets_per_second": len(window_tweets) / (window_minutes * 60),
})
result = pd.DataFrame(coordination_windows)
if len(result) > 0:
result = result.drop_duplicates(subset=["window_start"]).sort_values(
"unique_accounts", ascending=False
)
logger.info(
f"Found {len(result)} coordination windows with "
f"≥{min_accounts} simultaneous accounts. "
f"Peak: {result['unique_accounts'].max()} accounts in {window_minutes}min"
)
return result
def compute_inter_account_correlation(
self,
tweets_df: pd.DataFrame,
sample_accounts: int = 50,
freq: str = "H",
) -> pd.DataFrame:
"""
Compute pairwise temporal correlation between account activity series.
High correlation signals coordinated or automated behavior.
"""
df = tweets_df.copy()
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
# Sample accounts by activity level
top_accounts = (
df["account_id"].value_counts().head(sample_accounts).index.tolist()
)
activity_matrix = pd.DataFrame()
for account_id in top_accounts:
account_tweets = df[df["account_id"] == account_id]
activity = (
account_tweets.set_index("timestamp")
.resample(freq)
.size()
)
activity_matrix[str(account_id)] = activity
activity_matrix = activity_matrix.fillna(0)
correlation_matrix = activity_matrix.corr()
# Visualize
fig, ax = plt.subplots(figsize=(12, 10))
sns.heatmap(
correlation_matrix,
ax=ax,
cmap="RdYlGn",
vmin=-1, vmax=1,
xticklabels=False,
yticklabels=False,
cbar_kws={"label": "Pearson Correlation"},
)
ax.set_title(
f"Inter-Account Activity Correlation (top {sample_accounts} accounts, {freq} bins)",
fontsize=12, fontweight="bold"
)
plt.tight_layout()
plt.savefig(FIGURE_DIR / "account_correlation_heatmap.png", dpi=150, bbox_inches="tight")
plt.show()
return correlation_matrix
Phase 5: Attribution Analysis
"""
capstone02/attribution.py
Linguistic and signal-based attribution analysis.
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import logging
from collections import Counter
from pathlib import Path
import langdetect
from langdetect import detect_langs
logger = logging.getLogger(__name__)
class AttributionAnalyzer:
"""
Analyze linguistic, topical, and behavioral signals for attribution.
IMPORTANT EPISTEMOLOGICAL NOTE:
Attribution analysis in information operations is inherently uncertain.
Skilled operators deliberately obscure their origins through VPNs,
language mixing, and use of local proxies. Attribution assessments
should be expressed as confidence levels, not certainties.
The appropriate language is "consistent with," "suggests," and
"indicates with low/medium/high confidence" — not "proves" or "confirms."
"""
def detect_language_distribution(
self, texts: pd.Series, sample_size: int = 1000
) -> dict:
"""
Detect language distribution across a sample of texts.
Non-native language errors and mixed-language content can
provide attribution signals.
"""
sample = texts.dropna().sample(
min(sample_size, len(texts)), random_state=42
)
language_counts = Counter()
errors = 0
for text in sample:
try:
langs = detect_langs(str(text)[:500])
for lang in langs:
if lang.prob > 0.7:
language_counts[lang.lang] += lang.prob
except Exception:
errors += 1
total = sum(language_counts.values())
language_distribution = {
lang: count / total
for lang, count in language_counts.most_common(10)
}
logger.info(f"Language distribution: {language_distribution}")
logger.info(f"Detection errors: {errors}/{sample_size}")
return language_distribution
def analyze_timezone_signals(
self, tweets_df: pd.DataFrame
) -> dict:
"""
Infer likely operator timezone from hour-of-day posting distribution.
Most human operators show reduced activity during local night hours
(typically 0-6 AM local time). Bot accounts often lack this pattern.
"""
df = tweets_df.copy()
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
df = df.dropna(subset=["timestamp"])
df["hour_utc"] = df["timestamp"].dt.hour
hour_dist = df.groupby("hour_utc").size()
total = hour_dist.sum()
hour_pct = (hour_dist / total * 100).to_dict()
# Find the "quiet hours" — lowest 6 consecutive hours
hourly = [hour_pct.get(h, 0) for h in range(24)]
min_sum = float("inf")
quiet_start = 0
for start in range(24):
window_sum = sum(hourly[(start + i) % 24] for i in range(6))
if window_sum < min_sum:
min_sum = window_sum
quiet_start = start
# Infer timezone offset from quiet hours
# Assuming local quiet hours are 1-7 AM
local_quiet_center = 4 # 4 AM local
utc_quiet_center = (quiet_start + 3) % 24
estimated_utc_offset = local_quiet_center - utc_quiet_center
if estimated_utc_offset > 12:
estimated_utc_offset -= 24
elif estimated_utc_offset < -12:
estimated_utc_offset += 24
result = {
"hour_distribution_pct": hour_pct,
"inferred_quiet_hours_utc": f"{quiet_start:02d}:00 - {(quiet_start+6)%24:02d}:00",
"estimated_utc_offset": estimated_utc_offset,
"confidence": "low",
"caveat": (
"Timezone inference is unreliable for bot-operated accounts "
"and for accounts using VPNs or operated across multiple timezones."
),
}
logger.info(f"Timezone analysis: {result}")
return result
def extract_topical_focus(
self,
tweets_df: pd.DataFrame,
n_topics: int = 10,
) -> dict:
"""Identify the main topical foci of the operation using hashtag and keyword analysis."""
from sklearn.feature_extraction.text import CountVectorizer
texts = tweets_df["text"].dropna().tolist()
# Top hashtags
all_hashtags = []
for text in texts:
hashtags = re.findall(r"#(\w+)", text.lower())
all_hashtags.extend(hashtags)
top_hashtags = Counter(all_hashtags).most_common(20)
# Top keywords (excluding stopwords)
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS
vectorizer = CountVectorizer(
max_features=500,
stop_words="english",
min_df=5,
ngram_range=(1, 2),
)
try:
counts = vectorizer.fit_transform(texts[:50000])
word_counts = dict(zip(
vectorizer.get_feature_names_out(),
np.asarray(counts.sum(axis=0)).flatten()
))
top_keywords = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:50]
except Exception as e:
top_keywords = []
logger.warning(f"Keyword extraction failed: {e}")
return {
"top_hashtags": top_hashtags,
"top_keywords": top_keywords[:20],
}
def generate_attribution_assessment(
self,
linguistic_signals: dict,
temporal_signals: dict,
topical_signals: dict,
network_signals: dict,
) -> str:
"""
Generate a structured attribution assessment using the
Analysis of Competing Hypotheses (ACH) methodology.
Returns a formatted assessment string.
"""
assessment = """
ATTRIBUTION ASSESSMENT
======================
Classification: UNCLASSIFIED // FOR ACADEMIC RESEARCH
Executive Summary
-----------------
[Provide 2-3 sentence summary of key attribution judgments]
Linguistic Signals
------------------
Dominant language(s): {languages}
Confidence in language attribution: [LOW/MEDIUM/HIGH]
Supporting evidence: [Describe specific linguistic features]
Counter-evidence/uncertainties: [What could explain these features alternatively?]
Temporal Signals
----------------
Inferred operator timezone: UTC {offset}
Consistency with {timezone_region}: [LOW/MEDIUM/HIGH]
Bot vs. human operation indicators: [Describe]
Topical Focus
-------------
Primary target audiences: [Based on top hashtags and keywords]
Key narratives: [Describe 2-3 dominant narratives]
Geographic focus: [Based on topical signals]
Network Structure Signals
-------------------------
[Describe key network features that support or complicate attribution]
Overall Attribution Judgment
----------------------------
Consistent with [ACTOR] operation: [LOW/MEDIUM/HIGH] confidence
Key uncertainties:
1. [Uncertainty 1]
2. [Uncertainty 2]
3. [Uncertainty 3]
IMPORTANT CAVEAT: This assessment is based solely on open-source data
released by the platform and does not draw on classified or non-public
information. Attribution of information operations is inherently uncertain;
skilled operators deliberately obscure origins.
""".format(
languages=str(linguistic_signals.get("language_distribution", {})),
offset=temporal_signals.get("estimated_utc_offset", "unknown"),
timezone_region="[identified region]",
)
return assessment
Phase 6: Reporting — Writing an Intelligence Assessment
Your final deliverable for this project is a professional intelligence assessment report modeled on the format used by academic research organizations, investigative journalism outlets, and government agencies that study influence operations. The report should be 2,500–4,000 words and must follow this structure:
Report Structure
1. Executive Summary (200–300 words) Key findings stated as confidence-weighted judgments. Enumerate the operation's scale, apparent origin, primary target audiences, and main narratives.
2. Dataset and Methodology (300–400 words) Which dataset you used and why. How you loaded, cleaned, and analyzed it. What methods you applied and their known limitations.
3. Operational Scale and Scope (400–600 words) Account count, tweet count, date range, languages, geographic signals. Include the summary statistics table from Phase 1. Visualizations: activity timeline, account creation curve.
4. Network Structure Analysis (400–600 words) Describe the retweet/mention network structure. Interpret the community structure from Phase 3. Include visualizations.
5. Coordinated Behavior Evidence (400–600 words) Temporal coordination signals from Phase 4. Content similarity patterns. Specific examples of coordination.
6. Topical and Linguistic Analysis (400–600 words) Main narratives, hashtag analysis, language distribution. Attribution signals.
7. Attribution Assessment (400–500 words) Stated as confidence-weighted judgments. Distinguish high-confidence findings from lower-confidence inferences.
8. Limitations and Uncertainties (200–300 words) What the data cannot tell you. Alternative explanations for key findings.
Deliverables Checklist
- [ ] Data acquisition documentation (which dataset, download date, version)
- [ ] Data loading and preprocessing notebook with summary statistics
- [ ] Network construction notebook (retweet network, hashtag network, content similarity network)
- [ ] Community detection results with community profiles table
- [ ] Temporal analysis notebook with activity timeline, hour-of-day distribution, coordination windows
- [ ] Attribution analysis notebook with linguistic and timezone signals
- [ ] Final intelligence assessment report (2,500–4,000 words, following the structure above)
- [ ] All figures saved at publication quality (150+ DPI)
- [ ] Reproducibility documentation: requirements.txt, random seeds, data loading instructions
Grading Rubric
| Criterion | Description | Points |
|---|---|---|
| 1. Data Acquisition and Documentation | Dataset properly obtained and documented. Preprocessing steps clearly described and justified. Summary statistics provided. | 0–10 |
| 2. Network Construction | At least two distinct network types constructed correctly. Network properties (size, density, degree distribution) computed and interpreted. | 0–10 |
| 3. Community Detection | Community detection algorithm correctly applied. Communities characterized substantively, not just numerically. Interpretation grounded in domain knowledge. | 0–10 |
| 4. Temporal Analysis | Activity timeline produced with peak identification. Hour-of-day analysis performed. Coordination windows analysis attempted. | 0–10 |
| 5. Attribution Analysis | Linguistic signals analyzed. Timezone inference attempted and appropriately qualified. Topical focus identified. | 0–10 |
| 6. Evidence Quality | Claims are supported by specific data and visualizations. Evidence is presented completely, including evidence that complicates simple narratives. | 0–10 |
| 7. Epistemic Calibration | Findings distinguished from inferences. Uncertainty acknowledged appropriately. Confidence levels qualified. | 0–10 |
| 8. Report Quality | Report follows the specified structure. Executive summary is accurate and concise. Prose is clear and professional. | 0–10 |
| 9. Critical Reflection | Ethical considerations of influence operation research addressed. Limitations of the methods discussed honestly. | 0–10 |
| 10. Code Quality | Code is readable, documented, and reproducible. Visualizations are clear and labeled. All dependencies specified. | 0–10 |
Total: 100 points
Environment Setup
pip install pandas numpy networkx matplotlib seaborn scipy
pip install python-louvain # Community detection
pip install scikit-learn
pip install langdetect
pip install jupyter notebook
Note on data size: The IRA dataset contains approximately 3 million tweets. If memory is constrained, work with a random 20% sample, clearly documenting this in your methodology section. The Iran 2018 dataset (~1.1 million tweets) is more manageable for most student computing environments.