Capstone Project 1: Building a Misinformation Detection Pipeline
Project Overview
This project guides you through the complete lifecycle of a machine learning system designed to classify news articles as reliable or potentially unreliable. You will collect data, engineer features, train and evaluate multiple models of increasing sophistication, and reflect critically on what your system can and cannot do. By the end, you will have a working end-to-end pipeline, a thorough evaluation of its performance and limitations, and a nuanced understanding of both the technical and ethical dimensions of automated misinformation detection.
Automated detection of misinformation is one of the most active areas of applied NLP research. The systems you build in this project represent a simplified but structurally faithful version of approaches used in academic research and in the trust-and-safety operations of major platforms. Understanding how these systems work — their design choices, their failure modes, and their ethical implications — is valuable regardless of whether you intend to work as a data scientist. Journalists, policy analysts, and civil society advocates who understand these systems are better equipped to evaluate their outputs, critique their deployment, and advocate for appropriate safeguards.
Learning Objectives
By completing this project, you will be able to:
- Design and implement a data collection and labeling pipeline for a text classification task
- Explain the trade-offs between different feature representations for text (bag-of-words, TF-IDF, dense embeddings)
- Train, evaluate, and compare multiple classification models using appropriate metrics
- Perform rigorous error analysis that goes beyond aggregate metrics to understand systematic failure patterns
- Explain the documented limitations of NLP-based misinformation detection systems
- Articulate the ethical implications of deploying automated content classifiers at scale
- Communicate technical findings to a non-technical audience in a written report
Phase 1: Data Collection
1.1 Choosing Your Dataset
For this project, you will work with a combination of existing labeled datasets and original data collection. Two strong options for your labeled dataset foundation:
Option A — LIAR Dataset: Contains 12,836 statements from PolitiFact, labeled across six veracity categories (pants-on-fire, false, barely-true, half-true, mostly-true, true). Rich metadata including speaker, context, and subject.
Option B — FakeNewsNet: Combines PolitiFact and GossipCop data with article text, social engagement data, and credibility labels. Requires more preprocessing but offers richer features.
We recommend using LIAR as your primary dataset and collecting a supplementary set of 200–500 articles from RSS feeds of clearly labeled reliable sources (Associated Press, Reuters, major newspaper fact sections) and clearly labeled unreliable sources (sites on the Media Bias/Fact Check "Questionable Sources" list with "Very Low" or "Low" factual reporting ratings).
1.2 Data Collection Pipeline
The following code scaffold implements the RSS collection pipeline.
"""
capstone01/data_collection.py
Data collection utilities for the misinformation detection pipeline.
"""
import feedparser
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import logging
import json
from pathlib import Path
from datetime import datetime
from typing import Optional
import hashlib
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Reliable news sources (RSS feeds)
RELIABLE_SOURCES = {
"ap_news": "https://feeds.apnews.com/rss/topnews",
"reuters_world": "https://feeds.reuters.com/reuters/worldNews",
"npr_news": "https://feeds.npr.org/1001/rss.xml",
"bbc_world": "http://feeds.bbci.co.uk/news/world/rss.xml",
"guardian_world": "https://www.theguardian.com/world/rss",
}
# Unreliable sources — these are sites rated as having very low factual
# reporting by independent media-rating organizations. Include only
# publicly accessible RSS feeds.
UNRELIABLE_SOURCES = {
# Add sources identified from Media Bias/Fact Check "Questionable Sources"
# Replace with actual RSS feeds you have identified and verified
"source_a": "https://example-questionable-source-a.com/rss",
"source_b": "https://example-questionable-source-b.com/rss",
}
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (compatible; MisinformationResearchBot/1.0; "
"Academic research project)"
)
}
def fetch_rss_feed(url: str, source_name: str) -> list[dict]:
"""
Parse an RSS feed and return a list of article metadata dicts.
Args:
url: RSS feed URL
source_name: Human-readable source identifier
Returns:
List of dicts with keys: title, url, published, source, source_name
"""
articles = []
try:
feed = feedparser.parse(url)
for entry in feed.entries:
article = {
"title": entry.get("title", ""),
"url": entry.get("link", ""),
"published": entry.get("published", ""),
"source": url,
"source_name": source_name,
"article_id": hashlib.md5(
entry.get("link", "").encode()
).hexdigest(),
}
articles.append(article)
logger.info(f"Fetched {len(articles)} articles from {source_name}")
except Exception as e:
logger.error(f"Error fetching {source_name}: {e}")
return articles
def fetch_article_text(url: str, timeout: int = 10) -> Optional[str]:
"""
Fetch and extract the main text content from a news article URL.
Args:
url: Article URL
timeout: Request timeout in seconds
Returns:
Extracted article text, or None if extraction fails
"""
try:
response = requests.get(url, headers=HEADERS, timeout=timeout)
response.raise_for_status()
soup = BeautifulSoup(response.content, "html.parser")
# Remove script, style, and navigation elements
for tag in soup(["script", "style", "nav", "header", "footer",
"aside", "advertisement"]):
tag.decompose()
# Try common article container selectors
article_selectors = [
"article",
'[role="main"]',
".article-body",
".story-body",
".post-content",
"main",
]
text = None
for selector in article_selectors:
container = soup.select_one(selector)
if container:
paragraphs = container.find_all("p")
text = " ".join(p.get_text(strip=True) for p in paragraphs)
if len(text) > 200:
break
if not text:
# Fallback: get all paragraphs
paragraphs = soup.find_all("p")
text = " ".join(p.get_text(strip=True) for p in paragraphs)
return text if len(text) > 100 else None
except Exception as e:
logger.warning(f"Failed to fetch {url}: {e}")
return None
def collect_articles(
sources: dict,
label: int,
output_path: Path,
delay: float = 1.5
) -> pd.DataFrame:
"""
Collect articles from multiple RSS feeds, fetch full text, and save.
Args:
sources: Dict mapping source_name -> rss_url
label: Binary label (1 = reliable, 0 = unreliable)
output_path: Path to save collected data
delay: Seconds to wait between requests (be a polite scraper)
Returns:
DataFrame of collected articles
"""
all_articles = []
for source_name, rss_url in sources.items():
feed_articles = fetch_rss_feed(rss_url, source_name)
for article in feed_articles:
time.sleep(delay) # Rate limiting — important for ethical scraping
text = fetch_article_text(article["url"])
if text:
article["text"] = text
article["label"] = label
article["collected_at"] = datetime.utcnow().isoformat()
all_articles.append(article)
logger.info(
f"Collected: {article['title'][:60]}..."
)
df = pd.DataFrame(all_articles)
output_path.mkdir(parents=True, exist_ok=True)
output_file = output_path / f"articles_label_{label}.csv"
df.to_csv(output_file, index=False)
logger.info(f"Saved {len(df)} articles to {output_file}")
return df
def load_liar_dataset(data_dir: Path) -> pd.DataFrame:
"""
Load and preprocess the LIAR dataset.
Download from: https://www.cs.ucsb.edu/~william/data/liar_dataset.zip
Args:
data_dir: Directory containing train.tsv, valid.tsv, test.tsv
Returns:
Combined DataFrame with binary labels
"""
columns = [
"id", "label", "statement", "subject", "speaker",
"speaker_job", "state", "party", "barely_true_count",
"false_count", "half_true_count", "mostly_true_count",
"pants_on_fire_count", "context"
]
# Map 6-way labels to binary (reliable vs. unreliable)
label_map = {
"pants-fire": 0,
"false": 0,
"barely-true": 0,
"half-true": 1, # Debatable — adjust based on your research question
"mostly-true": 1,
"true": 1,
}
dfs = []
for split in ["train", "valid", "test"]:
filepath = data_dir / f"{split}.tsv"
if filepath.exists():
df = pd.read_csv(filepath, sep="\t", header=None, names=columns)
df["split"] = split
df["binary_label"] = df["label"].map(label_map)
df = df.dropna(subset=["binary_label"])
dfs.append(df)
combined = pd.concat(dfs, ignore_index=True)
logger.info(
f"Loaded LIAR dataset: {len(combined)} examples, "
f"label distribution:\n{combined['binary_label'].value_counts()}"
)
return combined
def merge_datasets(
liar_df: pd.DataFrame,
scraped_df: pd.DataFrame
) -> pd.DataFrame:
"""
Merge LIAR and scraped article datasets into a unified format.
"""
# Normalize LIAR to unified schema
liar_unified = pd.DataFrame({
"text": liar_df["statement"],
"label": liar_df["binary_label"],
"source": "liar_dataset",
"source_name": liar_df["speaker"],
"split": liar_df["split"],
})
# Normalize scraped data
scraped_unified = pd.DataFrame({
"text": scraped_df["text"],
"label": scraped_df["label"],
"source": scraped_df["source"],
"source_name": scraped_df["source_name"],
"split": "scraped",
})
combined = pd.concat([liar_unified, scraped_unified], ignore_index=True)
combined = combined.dropna(subset=["text", "label"])
combined["text"] = combined["text"].astype(str)
combined["label"] = combined["label"].astype(int)
logger.info(f"Merged dataset: {len(combined)} total examples")
return combined
if __name__ == "__main__":
output_dir = Path("data/raw")
# Collect from reliable sources
reliable_df = collect_articles(RELIABLE_SOURCES, label=1, output_path=output_dir)
# Collect from unreliable sources
unreliable_df = collect_articles(UNRELIABLE_SOURCES, label=0, output_path=output_dir)
# Load LIAR dataset
liar_df = load_liar_dataset(Path("data/liar_dataset"))
# Merge everything
scraped_combined = pd.concat([reliable_df, unreliable_df], ignore_index=True)
final_df = merge_datasets(liar_df, scraped_combined)
final_df.to_csv(output_dir / "combined_dataset.csv", index=False)
1.3 Ethical Considerations for Data Collection
Before running your data collection pipeline, review these requirements:
- Respect
robots.txtfiles. Therequestslibrary does not do this automatically. - Read each site's terms of service. Many news sites prohibit automated scraping.
- Rate-limit your requests (the 1.5-second delay in the code is a minimum).
- Do not collect personally identifying information beyond what is necessary for your research question.
- Document every source you collect from, including the date of collection.
Phase 2: Feature Engineering
Feature engineering transforms raw text into numerical representations that machine learning models can process. This phase implements three complementary approaches: bag-of-words/TF-IDF representations, sentiment and stylometric features, and source metadata features.
"""
capstone01/feature_engineering.py
Feature extraction for the misinformation detection pipeline.
"""
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.preprocessing import StandardScaler
import re
import string
from typing import Tuple
import logging
from textblob import TextBlob
import scipy.sparse as sp
logger = logging.getLogger(__name__)
class TextPreprocessor:
"""Clean and normalize raw article text."""
def __init__(
self,
remove_urls: bool = True,
lowercase: bool = True,
remove_punctuation: bool = False, # Keep for stylometric features
):
self.remove_urls = remove_urls
self.lowercase = lowercase
self.remove_punctuation = remove_punctuation
def clean(self, text: str) -> str:
if not isinstance(text, str):
return ""
if self.remove_urls:
text = re.sub(r"http\S+|www\S+", " URL ", text)
# Normalize whitespace
text = re.sub(r"\s+", " ", text).strip()
if self.lowercase:
text = text.lower()
if self.remove_punctuation:
text = text.translate(str.maketrans("", "", string.punctuation))
return text
def fit_transform(self, texts: pd.Series) -> pd.Series:
return texts.apply(self.clean)
class TFIDFFeatureExtractor:
"""Extract TF-IDF features from text."""
def __init__(
self,
max_features: int = 50000,
ngram_range: Tuple[int, int] = (1, 2),
min_df: int = 3,
sublinear_tf: bool = True,
):
self.vectorizer = TfidfVectorizer(
max_features=max_features,
ngram_range=ngram_range,
min_df=min_df,
sublinear_tf=sublinear_tf,
strip_accents="unicode",
analyzer="word",
token_pattern=r"(?u)\b[a-zA-Z]{2,}\b",
)
def fit(self, texts: pd.Series) -> "TFIDFFeatureExtractor":
self.vectorizer.fit(texts)
return self
def transform(self, texts: pd.Series):
return self.vectorizer.transform(texts)
def fit_transform(self, texts: pd.Series):
return self.vectorizer.fit_transform(texts)
def get_feature_names(self) -> list:
return self.vectorizer.get_feature_names_out().tolist()
class StylometricFeatureExtractor:
"""
Extract stylometric features — writing style characteristics that
research has shown to differ systematically between reliable and
unreliable content.
"""
EXCLAMATION_RE = re.compile(r"!")
QUESTION_RE = re.compile(r"\?")
CAPS_WORD_RE = re.compile(r"\b[A-Z]{2,}\b")
QUOTE_RE = re.compile(r'"[^"]*"')
NUMBER_RE = re.compile(r"\b\d+\.?\d*\b")
ELLIPSIS_RE = re.compile(r"\.\.\.")
# Hedging language (associated with reliable/scientific writing)
HEDGE_WORDS = {
"approximately", "roughly", "about", "around", "nearly", "almost",
"suggest", "indicate", "appear", "seem", "may", "might", "could",
"possibly", "perhaps", "likely", "probably", "generally", "often",
"tend", "evidence", "research", "study", "studies", "according"
}
# Certainty/absolute language (sometimes associated with unreliable content)
CERTAINTY_WORDS = {
"always", "never", "everyone", "nobody", "all", "none", "definitely",
"certainly", "absolutely", "guaranteed", "proven", "fact", "truth",
"clearly", "obviously", "undeniably", "undoubtedly", "100%"
}
# Clickbait/sensationalist words
CLICKBAIT_WORDS = {
"shocking", "bombshell", "explosive", "breaking", "exposed",
"secret", "hidden", "leaked", "they don't want you to know",
"doctors hate", "this one weird", "unbelievable", "mind-blowing",
"you won't believe"
}
def extract(self, text: str) -> dict:
"""Extract stylometric features from a single text."""
if not text or not isinstance(text, str):
return self._empty_features()
words = text.split()
sentences = re.split(r"[.!?]+", text)
sentences = [s.strip() for s in sentences if s.strip()]
word_count = len(words)
sentence_count = max(len(sentences), 1)
char_count = len(text)
# Word-level features
unique_words = set(w.lower() for w in words)
avg_word_length = (
np.mean([len(w) for w in words]) if words else 0
)
type_token_ratio = len(unique_words) / max(word_count, 1)
# Sentence-level features
avg_sentence_length = word_count / sentence_count
# Punctuation and typography features
exclamation_count = len(self.EXCLAMATION_RE.findall(text))
question_count = len(self.QUESTION_RE.findall(text))
caps_word_count = len(self.CAPS_WORD_RE.findall(text))
quote_count = len(self.QUOTE_RE.findall(text))
number_count = len(self.NUMBER_RE.findall(text))
ellipsis_count = len(self.ELLIPSIS_RE.findall(text))
# Lexical category features (normalized by word count)
text_lower = text.lower()
words_lower = [w.lower() for w in words]
words_lower_set = set(words_lower)
hedge_ratio = len(self.HEDGE_WORDS & words_lower_set) / max(word_count, 1)
certainty_ratio = len(self.CERTAINTY_WORDS & words_lower_set) / max(word_count, 1)
clickbait_count = sum(
1 for phrase in self.CLICKBAIT_WORDS if phrase in text_lower
)
# Sentiment features via TextBlob
try:
blob = TextBlob(text[:5000]) # Limit for performance
sentiment_polarity = blob.sentiment.polarity
sentiment_subjectivity = blob.sentiment.subjectivity
except Exception:
sentiment_polarity = 0.0
sentiment_subjectivity = 0.0
return {
"word_count": word_count,
"sentence_count": sentence_count,
"char_count": char_count,
"avg_word_length": avg_word_length,
"avg_sentence_length": avg_sentence_length,
"type_token_ratio": type_token_ratio,
"exclamation_per_sentence": exclamation_count / sentence_count,
"question_per_sentence": question_count / sentence_count,
"caps_words_per_100": (caps_word_count / max(word_count, 1)) * 100,
"quote_density": quote_count / max(word_count, 1),
"number_density": number_count / max(word_count, 1),
"ellipsis_count": ellipsis_count,
"hedge_ratio": hedge_ratio,
"certainty_ratio": certainty_ratio,
"clickbait_count": clickbait_count,
"sentiment_polarity": sentiment_polarity,
"sentiment_subjectivity": sentiment_subjectivity,
}
def _empty_features(self) -> dict:
return {k: 0.0 for k in [
"word_count", "sentence_count", "char_count",
"avg_word_length", "avg_sentence_length", "type_token_ratio",
"exclamation_per_sentence", "question_per_sentence",
"caps_words_per_100", "quote_density", "number_density",
"ellipsis_count", "hedge_ratio", "certainty_ratio",
"clickbait_count", "sentiment_polarity", "sentiment_subjectivity"
]}
def transform(self, texts: pd.Series) -> pd.DataFrame:
features = texts.apply(self.extract)
return pd.DataFrame(features.tolist())
class SourceMetadataExtractor:
"""
Extract features derived from article source metadata.
These features are strong predictors but raise ethical concerns
about source-level blacklisting (discussed in Phase 5).
"""
def __init__(self, source_ratings: Optional[dict] = None):
"""
Args:
source_ratings: Dict mapping domain -> credibility score (0-1).
If None, domain-level features are still extracted
but no external ratings are applied.
"""
self.source_ratings = source_ratings or {}
def extract_domain(self, url: str) -> str:
"""Extract the base domain from a URL."""
if not url or not isinstance(url, str):
return "unknown"
try:
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Remove www. prefix
if domain.startswith("www."):
domain = domain[4:]
return domain
except Exception:
return "unknown"
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Extract source metadata features from a DataFrame with
'source' (URL) and 'source_name' columns.
"""
features = pd.DataFrame()
if "source" in df.columns:
features["domain"] = df["source"].apply(self.extract_domain)
if self.source_ratings:
features["source_credibility_score"] = (
features["domain"].map(self.source_ratings).fillna(0.5)
)
else:
features["domain"] = "unknown"
features["source_credibility_score"] = 0.5
# Has byline (approximated from source_name being non-generic)
if "source_name" in df.columns:
features["has_named_source"] = (
df["source_name"].notna()
& (df["source_name"].str.len() > 2)
& (~df["source_name"].isin(["unknown", "N/A", ""]))
).astype(int)
else:
features["has_named_source"] = 0
return features
def build_feature_matrix(
df: pd.DataFrame,
tfidf_extractor: TFIDFFeatureExtractor,
fit_tfidf: bool = True,
) -> Tuple[sp.csr_matrix, np.ndarray]:
"""
Build the complete feature matrix combining TF-IDF, stylometric,
and source metadata features.
Args:
df: DataFrame with 'text', 'source', 'source_name' columns
tfidf_extractor: TFIDFFeatureExtractor instance
fit_tfidf: Whether to fit the TF-IDF vectorizer (True for train set)
Returns:
Tuple of (feature_matrix, labels)
"""
preprocessor = TextPreprocessor()
clean_texts = preprocessor.fit_transform(df["text"])
# TF-IDF features (sparse)
if fit_tfidf:
tfidf_features = tfidf_extractor.fit_transform(clean_texts)
else:
tfidf_features = tfidf_extractor.transform(clean_texts)
# Stylometric features (dense)
stylo_extractor = StylometricFeatureExtractor()
stylo_features = stylo_extractor.transform(df["text"])
# Scale stylometric features
scaler = StandardScaler()
if fit_tfidf:
stylo_scaled = scaler.fit_transform(stylo_features)
else:
stylo_scaled = scaler.transform(stylo_features)
stylo_sparse = sp.csr_matrix(stylo_scaled)
# Source metadata features
meta_extractor = SourceMetadataExtractor()
meta_features = meta_extractor.transform(df)
# Use only numeric columns
meta_numeric = meta_features.select_dtypes(include=[np.number])
meta_sparse = sp.csr_matrix(meta_numeric.values)
# Combine all features
feature_matrix = sp.hstack([tfidf_features, stylo_sparse, meta_sparse])
labels = df["label"].values
logger.info(
f"Feature matrix shape: {feature_matrix.shape}, "
f"Label distribution: {np.bincount(labels)}"
)
return feature_matrix, labels
Phase 3: Model Development
This phase trains three models of increasing complexity: a logistic regression baseline, a random forest ensemble, and a fine-tuned BERT classifier.
"""
capstone01/models.py
Model training and management for the misinformation detection pipeline.
"""
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_validate
import scipy.sparse as sp
import logging
import json
import pickle
from pathlib import Path
from typing import Optional
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
TrainingArguments,
Trainer,
)
import evaluate
logger = logging.getLogger(__name__)
# ============================================================
# Model 1: Logistic Regression Baseline
# ============================================================
class LogisticRegressionClassifier:
"""
L2-regularized logistic regression on TF-IDF + stylometric features.
This is the baseline model — simple, interpretable, and surprisingly competitive.
"""
def __init__(
self,
C: float = 1.0,
max_iter: int = 1000,
class_weight: str = "balanced",
random_state: int = 42,
):
self.model = LogisticRegression(
C=C,
max_iter=max_iter,
class_weight=class_weight,
random_state=random_state,
solver="lbfgs",
n_jobs=-1,
)
self.is_fitted = False
def fit(self, X, y):
self.model.fit(X, y)
self.is_fitted = True
return self
def predict(self, X):
return self.model.predict(X)
def predict_proba(self, X):
return self.model.predict_proba(X)
def get_top_features(
self,
feature_names: list,
n: int = 20
) -> dict:
"""
Return the most informative features for each class.
This is a key interpretability tool for logistic regression.
"""
if not self.is_fitted:
raise RuntimeError("Model must be fitted first")
coef = self.model.coef_[0]
top_reliable_indices = np.argsort(coef)[-n:][::-1]
top_unreliable_indices = np.argsort(coef)[:n]
return {
"top_reliable_features": [
(feature_names[i], float(coef[i]))
for i in top_reliable_indices
if i < len(feature_names)
],
"top_unreliable_features": [
(feature_names[i], float(coef[i]))
for i in top_unreliable_indices
if i < len(feature_names)
],
}
def save(self, path: Path):
with open(path, "wb") as f:
pickle.dump(self.model, f)
@classmethod
def load(cls, path: Path) -> "LogisticRegressionClassifier":
instance = cls()
with open(path, "rb") as f:
instance.model = pickle.load(f)
instance.is_fitted = True
return instance
# ============================================================
# Model 2: Random Forest
# ============================================================
class RandomForestMisinformationClassifier:
"""
Random forest classifier. Less interpretable than logistic regression
but often captures non-linear feature interactions.
"""
def __init__(
self,
n_estimators: int = 200,
max_depth: Optional[int] = None,
min_samples_leaf: int = 5,
class_weight: str = "balanced",
random_state: int = 42,
n_jobs: int = -1,
):
self.model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
min_samples_leaf=min_samples_leaf,
class_weight=class_weight,
random_state=random_state,
n_jobs=n_jobs,
)
self.is_fitted = False
def fit(self, X, y):
# Random forest requires dense input
if sp.issparse(X):
X = X.toarray()
self.model.fit(X, y)
self.is_fitted = True
return self
def predict(self, X):
if sp.issparse(X):
X = X.toarray()
return self.model.predict(X)
def predict_proba(self, X):
if sp.issparse(X):
X = X.toarray()
return self.model.predict_proba(X)
def get_feature_importances(
self,
feature_names: list,
n: int = 30
) -> list:
importances = self.model.feature_importances_
indices = np.argsort(importances)[-n:][::-1]
return [
(feature_names[i], float(importances[i]))
for i in indices
if i < len(feature_names)
]
def save(self, path: Path):
with open(path, "wb") as f:
pickle.dump(self.model, f)
# ============================================================
# Model 3: Fine-tuned BERT
# ============================================================
class NewsDataset(Dataset):
"""PyTorch Dataset for tokenized news articles."""
def __init__(
self,
texts: list,
labels: list,
tokenizer,
max_length: int = 512,
):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
encoding = self.tokenizer(
self.texts[idx],
truncation=True,
max_length=self.max_length,
padding="max_length",
return_tensors="pt",
)
return {
"input_ids": encoding["input_ids"].squeeze(),
"attention_mask": encoding["attention_mask"].squeeze(),
"labels": torch.tensor(self.labels[idx], dtype=torch.long),
}
def compute_metrics(eval_pred):
"""Compute metrics for HuggingFace Trainer."""
accuracy_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
accuracy = accuracy_metric.compute(
predictions=predictions, references=labels
)
f1 = f1_metric.compute(
predictions=predictions, references=labels, average="weighted"
)
return {**accuracy, **f1}
class BERTMisinformationClassifier:
"""
Fine-tuned BERT classifier for misinformation detection.
Uses a pretrained bert-base-uncased model with a classification head.
For production use, consider distilbert-base-uncased for lower latency.
"""
def __init__(
self,
model_name: str = "bert-base-uncased",
num_labels: int = 2,
output_dir: str = "models/bert_classifier",
learning_rate: float = 2e-5,
num_epochs: int = 3,
batch_size: int = 16,
warmup_ratio: float = 0.1,
weight_decay: float = 0.01,
):
self.model_name = model_name
self.num_labels = num_labels
self.output_dir = output_dir
self.learning_rate = learning_rate
self.num_epochs = num_epochs
self.batch_size = batch_size
self.warmup_ratio = warmup_ratio
self.weight_decay = weight_decay
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(
model_name,
num_labels=num_labels,
)
def fine_tune(
self,
train_texts: list,
train_labels: list,
val_texts: list,
val_labels: list,
):
"""Fine-tune BERT on the training data."""
train_dataset = NewsDataset(
train_texts, train_labels, self.tokenizer
)
val_dataset = NewsDataset(val_texts, val_labels, self.tokenizer)
training_args = TrainingArguments(
output_dir=self.output_dir,
num_train_epochs=self.num_epochs,
per_device_train_batch_size=self.batch_size,
per_device_eval_batch_size=self.batch_size,
learning_rate=self.learning_rate,
warmup_ratio=self.warmup_ratio,
weight_decay=self.weight_decay,
evaluation_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="f1",
report_to="none", # Disable W&B logging for student use
logging_dir=f"{self.output_dir}/logs",
logging_steps=50,
)
trainer = Trainer(
model=self.model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
logger.info("Starting BERT fine-tuning...")
trainer.train()
logger.info("Fine-tuning complete.")
self.trainer = trainer
return trainer
def predict(self, texts: list) -> np.ndarray:
"""Generate predictions for a list of texts."""
self.model.eval()
all_preds = []
dataset = NewsDataset(
texts,
[0] * len(texts), # Dummy labels
self.tokenizer
)
loader = DataLoader(dataset, batch_size=32, shuffle=False)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model.to(device)
with torch.no_grad():
for batch in loader:
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask
)
preds = torch.argmax(outputs.logits, dim=-1)
all_preds.extend(preds.cpu().numpy())
return np.array(all_preds)
def predict_proba(self, texts: list) -> np.ndarray:
"""Generate probability estimates."""
self.model.eval()
all_probs = []
dataset = NewsDataset(
texts,
[0] * len(texts),
self.tokenizer
)
loader = DataLoader(dataset, batch_size=32, shuffle=False)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model.to(device)
with torch.no_grad():
for batch in loader:
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask
)
probs = torch.softmax(outputs.logits, dim=-1)
all_probs.extend(probs.cpu().numpy())
return np.array(all_probs)
Phase 4: Evaluation
"""
capstone01/evaluation.py
Comprehensive evaluation of misinformation detection models.
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import seaborn as sns
from sklearn.metrics import (
classification_report,
confusion_matrix,
roc_auc_score,
roc_curve,
precision_recall_curve,
average_precision_score,
ConfusionMatrixDisplay,
)
from sklearn.model_selection import StratifiedKFold, cross_validate
import logging
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
plt.style.use("seaborn-v0_8-whitegrid")
FIGURE_DIR = Path("figures")
FIGURE_DIR.mkdir(exist_ok=True)
def evaluate_classifier(
y_true: np.ndarray,
y_pred: np.ndarray,
y_prob: Optional[np.ndarray],
model_name: str,
) -> dict:
"""
Comprehensive evaluation of a binary classifier.
Args:
y_true: True labels
y_pred: Predicted labels
y_prob: Predicted probabilities for positive class (optional)
model_name: Name for display purposes
Returns:
Dict of evaluation metrics
"""
report = classification_report(
y_true, y_pred,
target_names=["Unreliable", "Reliable"],
output_dict=True
)
metrics = {
"model_name": model_name,
"accuracy": report["accuracy"],
"precision_reliable": report["Reliable"]["precision"],
"recall_reliable": report["Reliable"]["recall"],
"f1_reliable": report["Reliable"]["f1-score"],
"precision_unreliable": report["Unreliable"]["precision"],
"recall_unreliable": report["Unreliable"]["recall"],
"f1_unreliable": report["Unreliable"]["f1-score"],
"macro_f1": report["macro avg"]["f1-score"],
"weighted_f1": report["weighted avg"]["f1-score"],
}
if y_prob is not None:
metrics["roc_auc"] = roc_auc_score(y_true, y_prob)
metrics["avg_precision"] = average_precision_score(y_true, y_prob)
logger.info(f"\n{'='*60}")
logger.info(f"Evaluation Results: {model_name}")
logger.info(f"{'='*60}")
logger.info(classification_report(
y_true, y_pred, target_names=["Unreliable", "Reliable"]
))
if y_prob is not None:
logger.info(f"ROC-AUC: {metrics['roc_auc']:.4f}")
return metrics
def plot_confusion_matrix(
y_true: np.ndarray,
y_pred: np.ndarray,
model_name: str,
save: bool = True,
):
"""Plot and optionally save a confusion matrix."""
fig, ax = plt.subplots(figsize=(6, 5))
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(
confusion_matrix=cm,
display_labels=["Unreliable", "Reliable"]
)
disp.plot(ax=ax, colorbar=False, cmap="Blues")
ax.set_title(f"Confusion Matrix: {model_name}", fontsize=13, fontweight="bold")
plt.tight_layout()
if save:
path = FIGURE_DIR / f"confusion_matrix_{model_name.lower().replace(' ', '_')}.png"
plt.savefig(path, dpi=150, bbox_inches="tight")
logger.info(f"Saved confusion matrix to {path}")
plt.show()
def plot_roc_curves(
models_data: list[dict],
save: bool = True,
):
"""
Plot ROC curves for multiple models on a single figure.
Args:
models_data: List of dicts with keys 'name', 'y_true', 'y_prob'
"""
fig, ax = plt.subplots(figsize=(8, 6))
colors = ["#2196F3", "#F44336", "#4CAF50", "#FF9800"]
for i, data in enumerate(models_data):
fpr, tpr, _ = roc_curve(data["y_true"], data["y_prob"])
auc = roc_auc_score(data["y_true"], data["y_prob"])
ax.plot(
fpr, tpr,
color=colors[i % len(colors)],
lw=2,
label=f"{data['name']} (AUC = {auc:.3f})"
)
ax.plot([0, 1], [0, 1], "k--", lw=1, label="Random classifier")
ax.set_xlim([0.0, 1.0])
ax.set_ylim([0.0, 1.05])
ax.set_xlabel("False Positive Rate", fontsize=12)
ax.set_ylabel("True Positive Rate", fontsize=12)
ax.set_title("ROC Curves: Model Comparison", fontsize=14, fontweight="bold")
ax.legend(loc="lower right", fontsize=10)
plt.tight_layout()
if save:
path = FIGURE_DIR / "roc_curves_comparison.png"
plt.savefig(path, dpi=150, bbox_inches="tight")
logger.info(f"Saved ROC curves to {path}")
plt.show()
def error_analysis(
df: pd.DataFrame,
y_true: np.ndarray,
y_pred: np.ndarray,
y_prob: Optional[np.ndarray] = None,
n_examples: int = 20,
) -> dict:
"""
Perform structured error analysis on misclassified examples.
Args:
df: DataFrame with 'text' and other columns
y_true: True labels
y_pred: Predicted labels
y_prob: Predicted probabilities (optional)
n_examples: Number of error examples to examine
Returns:
Dict containing false positive and false negative examples
"""
results = {}
# False positives: predicted reliable (1), actually unreliable (0)
fp_mask = (y_pred == 1) & (y_true == 0)
fp_indices = np.where(fp_mask)[0]
# False negatives: predicted unreliable (0), actually reliable (1)
fn_mask = (y_pred == 0) & (y_true == 1)
fn_indices = np.where(fn_mask)[0]
logger.info(f"\nError Analysis Summary:")
logger.info(f" False Positives (unreliable predicted as reliable): {fp_mask.sum()}")
logger.info(f" False Negatives (reliable predicted as unreliable): {fn_mask.sum()}")
logger.info(f" FP Rate: {fp_mask.sum() / max((y_true == 0).sum(), 1):.3f}")
logger.info(f" FN Rate: {fn_mask.sum() / max((y_true == 1).sum(), 1):.3f}")
# Sample worst errors by confidence (most confident wrong predictions)
if y_prob is not None:
if len(fp_indices) > 0:
fp_probs = y_prob[fp_indices]
fp_worst = fp_indices[np.argsort(fp_probs)[-n_examples:][::-1]]
results["false_positives"] = df.iloc[fp_worst][["text"]].copy()
results["false_positives"]["confidence"] = y_prob[fp_worst]
if len(fn_indices) > 0:
fn_probs = 1 - y_prob[fn_indices] # Confidence in wrong prediction
fn_worst = fn_indices[np.argsort(fn_probs)[-n_examples:][::-1]]
results["false_negatives"] = df.iloc[fn_worst][["text"]].copy()
results["false_negatives"]["confidence"] = fn_probs[np.argsort(fn_probs)[-n_examples:][::-1]]
return results
def cross_validate_model(
model,
X,
y: np.ndarray,
cv: int = 5,
scoring: list = None,
) -> pd.DataFrame:
"""
Perform stratified k-fold cross-validation.
Returns a DataFrame of per-fold scores.
"""
if scoring is None:
scoring = ["accuracy", "f1_weighted", "roc_auc"]
skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=42)
cv_results = cross_validate(
model.model,
X,
y,
cv=skf,
scoring=scoring,
return_train_score=True,
n_jobs=-1,
)
results_df = pd.DataFrame(cv_results)
logger.info(f"\nCross-validation results ({cv} folds):")
for metric in scoring:
test_key = f"test_{metric}"
if test_key in results_df.columns:
scores = results_df[test_key]
logger.info(
f" {metric}: {scores.mean():.4f} (+/- {scores.std():.4f})"
)
return results_df
def compare_models(metrics_list: list[dict]) -> pd.DataFrame:
"""Create a comparison table from a list of evaluation metric dicts."""
comparison = pd.DataFrame(metrics_list)
comparison = comparison.set_index("model_name")
display_cols = [
"accuracy", "macro_f1", "weighted_f1",
"roc_auc", "precision_reliable", "recall_reliable",
"precision_unreliable", "recall_unreliable"
]
display_cols = [c for c in display_cols if c in comparison.columns]
logger.info("\nModel Comparison Table:")
logger.info(comparison[display_cols].to_string())
return comparison[display_cols]
Phase 5: Deployment Considerations
5.1 API Wrapper
"""
capstone01/api.py
Simple FastAPI wrapper for the misinformation detection pipeline.
For demonstration and educational purposes only.
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import numpy as np
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
app = FastAPI(
title="Misinformation Detection API",
description=(
"An educational API for demonstrating misinformation detection. "
"NOT suitable for production content moderation without extensive "
"additional validation and human review processes."
),
version="0.1.0",
)
class ArticleRequest(BaseModel):
text: str
source_url: str = ""
source_name: str = ""
class DetectionResult(BaseModel):
text_preview: str
predicted_label: str
confidence: float
reliability_probability: float
warning: str
feature_highlights: dict
# Global model objects (loaded at startup)
tfidf_extractor = None
lr_classifier = None
@app.on_event("startup")
async def load_models():
"""Load models at API startup."""
global tfidf_extractor, lr_classifier
# In a real deployment, load from serialized model files
# tfidf_extractor = TFIDFFeatureExtractor.load(Path("models/tfidf.pkl"))
# lr_classifier = LogisticRegressionClassifier.load(Path("models/lr.pkl"))
logger.info("Models loaded at startup")
@app.post("/detect", response_model=DetectionResult)
async def detect_misinformation(request: ArticleRequest):
"""
Classify an article as reliable or potentially unreliable.
IMPORTANT LIMITATIONS (read before using):
1. This model was trained on specific datasets and may not generalize.
2. High confidence does not mean the model is correct.
3. Results should ALWAYS be reviewed by a human before acting on them.
4. The model reflects biases present in its training data.
5. Satire, opinion, and creative writing may be misclassified.
"""
if not request.text or len(request.text) < 50:
raise HTTPException(
status_code=400,
detail="Text must be at least 50 characters."
)
WARNING_TEXT = (
"This prediction is generated by an automated system and has known "
"limitations. Do not use this result as the sole basis for any "
"consequential decision about content or its producers."
)
# Placeholder for actual model inference
# In a complete implementation, run the feature extraction and prediction pipeline
result = DetectionResult(
text_preview=request.text[:200] + "...",
predicted_label="reliable", # Placeholder
confidence=0.0, # Placeholder
reliability_probability=0.0, # Placeholder
warning=WARNING_TEXT,
feature_highlights={},
)
return result
@app.get("/model-info")
async def model_info():
"""Return information about the deployed model."""
return {
"model_type": "Logistic Regression on TF-IDF + Stylometric Features",
"training_data": "LIAR dataset + supplementary scraped articles",
"known_limitations": [
"Performance degrades on topics not well-represented in training data",
"Cannot verify factual claims — classifies writing style, not truth",
"Unreliable sources that adopt reliable stylistic conventions may evade detection",
"Reliable sources that use informal or emotionally expressive language may be flagged",
"Model reflects demographic and linguistic biases in training data",
],
"recommended_use": "Research, education, and human-assisted review only",
"not_recommended_for": (
"Automated content removal, account suspension, or any consequential "
"action without human review"
),
}
5.2 Ethical Safeguards Discussion
Your written report must include a section addressing the following ethical dimensions:
Accuracy asymmetry: A model with 85% overall accuracy will still misclassify 15% of articles — in a large-scale deployment, that represents millions of pieces of content. False positives (reliable content flagged as unreliable) and false negatives (unreliable content allowed through) have different costs in different contexts. Who bears the costs of each error type?
Training data bias: Your training data reflects the specific sources you chose to label as reliable or unreliable. Consider: Does your data include sources from countries other than the US? Does it include sources that serve minority language communities? Does it include sources with non-mainstream political perspectives that are nonetheless factually accurate? How might biases in your training data manifest in your model's behavior?
The interpretability imperative: When a model flags content as potentially unreliable, what explanation does it provide? Logistic regression coefficients offer some interpretability; BERT provides very little. The less interpretable a model, the harder it is to identify and correct systematic errors.
Adversarial adaptation: Publishing a classifier's feature weights allows sophisticated actors to adapt their content to evade detection. Academic transparency and detection effectiveness are in tension.
Deliverables Checklist
- [ ] Data collection script and documented dataset (minimum 1,000 examples, preferably 5,000+)
- [ ] Feature engineering notebook with visualization of feature distributions by label
- [ ] Trained logistic regression model with interpretability analysis (top features per class)
- [ ] Trained random forest model with feature importance analysis
- [ ] Fine-tuned BERT classifier (or justification for substituting DistilBERT if compute-constrained)
- [ ] Evaluation notebook with confusion matrices, ROC curves, and cross-validation results for all models
- [ ] Error analysis section examining at least 20 false positives and 20 false negatives
- [ ] Written report (3,000–5,000 words) covering methodology, results, limitations, and ethical analysis
- [ ] API wrapper code with documented ethical constraints
- [ ] Reproducibility: requirements.txt, random seeds set, data loading documented
Grading Rubric
| Criterion | Description | Points |
|---|---|---|
| 1. Data Quality | Dataset is appropriately sized, balanced, documented, and collected ethically. Sources are clearly identified with labeling rationale explained. | 0–10 |
| 2. Feature Engineering | At least three distinct feature types implemented. Feature distributions analyzed and visualized. Preprocessing decisions documented with rationale. | 0–10 |
| 3. Baseline Model | Logistic regression implemented correctly. Interpretability analysis (top features) performed and discussed. | 0–10 |
| 4. Advanced Models | Random forest and BERT (or equivalent transformer) implemented and trained. Hyperparameter choices documented with justification. | 0–10 |
| 5. Evaluation Rigor | Cross-validation performed. Multiple metrics reported (accuracy, F1, AUC). Confusion matrices presented. Results interpreted correctly. | 0–10 |
| 6. Error Analysis | Systematic analysis of false positives and false negatives. Patterns identified and explained. Implications for deployment discussed. | 0–10 |
| 7. Model Comparison | Models compared on consistent evaluation sets. Performance differences explained with reference to model architecture and features. | 0–10 |
| 8. Ethical Analysis | Substantive discussion of bias, false positive/negative trade-offs, adversarial adaptation, and deployment safeguards. Engages with relevant literature. | 0–10 |
| 9. Code Quality | Code is readable, documented, modular, and reproducible. Dependencies specified. Random seeds set. Data loading clearly documented. | 0–10 |
| 10. Written Report | Report is clear, well-organized, and demonstrates genuine engagement with the technical and ethical material. Findings communicated to a non-technical audience effectively. | 0–10 |
Total: 100 points
Grade thresholds: A (90–100), B (80–89), C (70–79), D (60–69), F (below 60)
Getting Started: Environment Setup
# Create and activate virtual environment
python -m venv capstone01-env
source capstone01-env/bin/activate # On Windows: capstone01-env\Scripts\activate
# Install dependencies
pip install pandas numpy scikit-learn scipy matplotlib seaborn
pip install feedparser requests beautifulsoup4 textblob
pip install torch torchvision torchaudio # See pytorch.org for CUDA variants
pip install transformers datasets evaluate accelerate
pip install fastapi uvicorn pydantic
pip install jupyter notebook ipykernel
pip install nltk
# Download LIAR dataset
# wget https://www.cs.ucsb.edu/~william/data/liar_dataset.zip
# unzip liar_dataset.zip -d data/liar_dataset/
# Verify CUDA availability (optional but speeds up BERT fine-tuning significantly)
python -c "import torch; print(f'CUDA available: {torch.cuda.is_available()}')"
Note: Fine-tuning BERT requires either a GPU or significant patience. DistilBERT (distilbert-base-uncased) trains approximately 60% faster with modest performance reduction and is an acceptable substitute for students without GPU access.