Capstone Project 1: Building a Misinformation Detection Pipeline

Project Overview

This project guides you through the complete lifecycle of a machine learning system designed to classify news articles as reliable or potentially unreliable. You will collect data, engineer features, train and evaluate multiple models of increasing sophistication, and reflect critically on what your system can and cannot do. By the end, you will have a working end-to-end pipeline, a thorough evaluation of its performance and limitations, and a nuanced understanding of both the technical and ethical dimensions of automated misinformation detection.

Automated detection of misinformation is one of the most active areas of applied NLP research. The systems you build in this project represent a simplified but structurally faithful version of approaches used in academic research and in the trust-and-safety operations of major platforms. Understanding how these systems work — their design choices, their failure modes, and their ethical implications — is valuable regardless of whether you intend to work as a data scientist. Journalists, policy analysts, and civil society advocates who understand these systems are better equipped to evaluate their outputs, critique their deployment, and advocate for appropriate safeguards.

Learning Objectives

By completing this project, you will be able to:

  1. Design and implement a data collection and labeling pipeline for a text classification task
  2. Explain the trade-offs between different feature representations for text (bag-of-words, TF-IDF, dense embeddings)
  3. Train, evaluate, and compare multiple classification models using appropriate metrics
  4. Perform rigorous error analysis that goes beyond aggregate metrics to understand systematic failure patterns
  5. Explain the documented limitations of NLP-based misinformation detection systems
  6. Articulate the ethical implications of deploying automated content classifiers at scale
  7. Communicate technical findings to a non-technical audience in a written report

Phase 1: Data Collection

1.1 Choosing Your Dataset

For this project, you will work with a combination of existing labeled datasets and original data collection. Two strong options for your labeled dataset foundation:

Option A — LIAR Dataset: Contains 12,836 statements from PolitiFact, labeled across six veracity categories (pants-on-fire, false, barely-true, half-true, mostly-true, true). Rich metadata including speaker, context, and subject.

Option B — FakeNewsNet: Combines PolitiFact and GossipCop data with article text, social engagement data, and credibility labels. Requires more preprocessing but offers richer features.

We recommend using LIAR as your primary dataset and collecting a supplementary set of 200–500 articles from RSS feeds of clearly labeled reliable sources (Associated Press, Reuters, major newspaper fact sections) and clearly labeled unreliable sources (sites on the Media Bias/Fact Check "Questionable Sources" list with "Very Low" or "Low" factual reporting ratings).

1.2 Data Collection Pipeline

The following code scaffold implements the RSS collection pipeline.

"""
capstone01/data_collection.py
Data collection utilities for the misinformation detection pipeline.
"""

import feedparser
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import logging
import json
from pathlib import Path
from datetime import datetime
from typing import Optional
import hashlib

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Reliable news sources (RSS feeds)
RELIABLE_SOURCES = {
    "ap_news": "https://feeds.apnews.com/rss/topnews",
    "reuters_world": "https://feeds.reuters.com/reuters/worldNews",
    "npr_news": "https://feeds.npr.org/1001/rss.xml",
    "bbc_world": "http://feeds.bbci.co.uk/news/world/rss.xml",
    "guardian_world": "https://www.theguardian.com/world/rss",
}

# Unreliable sources — these are sites rated as having very low factual
# reporting by independent media-rating organizations. Include only
# publicly accessible RSS feeds.
UNRELIABLE_SOURCES = {
    # Add sources identified from Media Bias/Fact Check "Questionable Sources"
    # Replace with actual RSS feeds you have identified and verified
    "source_a": "https://example-questionable-source-a.com/rss",
    "source_b": "https://example-questionable-source-b.com/rss",
}

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (compatible; MisinformationResearchBot/1.0; "
        "Academic research project)"
    )
}


def fetch_rss_feed(url: str, source_name: str) -> list[dict]:
    """
    Parse an RSS feed and return a list of article metadata dicts.

    Args:
        url: RSS feed URL
        source_name: Human-readable source identifier

    Returns:
        List of dicts with keys: title, url, published, source, source_name
    """
    articles = []
    try:
        feed = feedparser.parse(url)
        for entry in feed.entries:
            article = {
                "title": entry.get("title", ""),
                "url": entry.get("link", ""),
                "published": entry.get("published", ""),
                "source": url,
                "source_name": source_name,
                "article_id": hashlib.md5(
                    entry.get("link", "").encode()
                ).hexdigest(),
            }
            articles.append(article)
        logger.info(f"Fetched {len(articles)} articles from {source_name}")
    except Exception as e:
        logger.error(f"Error fetching {source_name}: {e}")
    return articles


def fetch_article_text(url: str, timeout: int = 10) -> Optional[str]:
    """
    Fetch and extract the main text content from a news article URL.

    Args:
        url: Article URL
        timeout: Request timeout in seconds

    Returns:
        Extracted article text, or None if extraction fails
    """
    try:
        response = requests.get(url, headers=HEADERS, timeout=timeout)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, "html.parser")

        # Remove script, style, and navigation elements
        for tag in soup(["script", "style", "nav", "header", "footer",
                          "aside", "advertisement"]):
            tag.decompose()

        # Try common article container selectors
        article_selectors = [
            "article",
            '[role="main"]',
            ".article-body",
            ".story-body",
            ".post-content",
            "main",
        ]

        text = None
        for selector in article_selectors:
            container = soup.select_one(selector)
            if container:
                paragraphs = container.find_all("p")
                text = " ".join(p.get_text(strip=True) for p in paragraphs)
                if len(text) > 200:
                    break

        if not text:
            # Fallback: get all paragraphs
            paragraphs = soup.find_all("p")
            text = " ".join(p.get_text(strip=True) for p in paragraphs)

        return text if len(text) > 100 else None

    except Exception as e:
        logger.warning(f"Failed to fetch {url}: {e}")
        return None


def collect_articles(
    sources: dict,
    label: int,
    output_path: Path,
    delay: float = 1.5
) -> pd.DataFrame:
    """
    Collect articles from multiple RSS feeds, fetch full text, and save.

    Args:
        sources: Dict mapping source_name -> rss_url
        label: Binary label (1 = reliable, 0 = unreliable)
        output_path: Path to save collected data
        delay: Seconds to wait between requests (be a polite scraper)

    Returns:
        DataFrame of collected articles
    """
    all_articles = []

    for source_name, rss_url in sources.items():
        feed_articles = fetch_rss_feed(rss_url, source_name)

        for article in feed_articles:
            time.sleep(delay)  # Rate limiting — important for ethical scraping
            text = fetch_article_text(article["url"])

            if text:
                article["text"] = text
                article["label"] = label
                article["collected_at"] = datetime.utcnow().isoformat()
                all_articles.append(article)
                logger.info(
                    f"Collected: {article['title'][:60]}..."
                )

    df = pd.DataFrame(all_articles)
    output_path.mkdir(parents=True, exist_ok=True)
    output_file = output_path / f"articles_label_{label}.csv"
    df.to_csv(output_file, index=False)
    logger.info(f"Saved {len(df)} articles to {output_file}")
    return df


def load_liar_dataset(data_dir: Path) -> pd.DataFrame:
    """
    Load and preprocess the LIAR dataset.
    Download from: https://www.cs.ucsb.edu/~william/data/liar_dataset.zip

    Args:
        data_dir: Directory containing train.tsv, valid.tsv, test.tsv

    Returns:
        Combined DataFrame with binary labels
    """
    columns = [
        "id", "label", "statement", "subject", "speaker",
        "speaker_job", "state", "party", "barely_true_count",
        "false_count", "half_true_count", "mostly_true_count",
        "pants_on_fire_count", "context"
    ]

    # Map 6-way labels to binary (reliable vs. unreliable)
    label_map = {
        "pants-fire": 0,
        "false": 0,
        "barely-true": 0,
        "half-true": 1,  # Debatable — adjust based on your research question
        "mostly-true": 1,
        "true": 1,
    }

    dfs = []
    for split in ["train", "valid", "test"]:
        filepath = data_dir / f"{split}.tsv"
        if filepath.exists():
            df = pd.read_csv(filepath, sep="\t", header=None, names=columns)
            df["split"] = split
            df["binary_label"] = df["label"].map(label_map)
            df = df.dropna(subset=["binary_label"])
            dfs.append(df)

    combined = pd.concat(dfs, ignore_index=True)
    logger.info(
        f"Loaded LIAR dataset: {len(combined)} examples, "
        f"label distribution:\n{combined['binary_label'].value_counts()}"
    )
    return combined


def merge_datasets(
    liar_df: pd.DataFrame,
    scraped_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Merge LIAR and scraped article datasets into a unified format.
    """
    # Normalize LIAR to unified schema
    liar_unified = pd.DataFrame({
        "text": liar_df["statement"],
        "label": liar_df["binary_label"],
        "source": "liar_dataset",
        "source_name": liar_df["speaker"],
        "split": liar_df["split"],
    })

    # Normalize scraped data
    scraped_unified = pd.DataFrame({
        "text": scraped_df["text"],
        "label": scraped_df["label"],
        "source": scraped_df["source"],
        "source_name": scraped_df["source_name"],
        "split": "scraped",
    })

    combined = pd.concat([liar_unified, scraped_unified], ignore_index=True)
    combined = combined.dropna(subset=["text", "label"])
    combined["text"] = combined["text"].astype(str)
    combined["label"] = combined["label"].astype(int)

    logger.info(f"Merged dataset: {len(combined)} total examples")
    return combined


if __name__ == "__main__":
    output_dir = Path("data/raw")

    # Collect from reliable sources
    reliable_df = collect_articles(RELIABLE_SOURCES, label=1, output_path=output_dir)

    # Collect from unreliable sources
    unreliable_df = collect_articles(UNRELIABLE_SOURCES, label=0, output_path=output_dir)

    # Load LIAR dataset
    liar_df = load_liar_dataset(Path("data/liar_dataset"))

    # Merge everything
    scraped_combined = pd.concat([reliable_df, unreliable_df], ignore_index=True)
    final_df = merge_datasets(liar_df, scraped_combined)
    final_df.to_csv(output_dir / "combined_dataset.csv", index=False)

1.3 Ethical Considerations for Data Collection

Before running your data collection pipeline, review these requirements:

  • Respect robots.txt files. The requests library does not do this automatically.
  • Read each site's terms of service. Many news sites prohibit automated scraping.
  • Rate-limit your requests (the 1.5-second delay in the code is a minimum).
  • Do not collect personally identifying information beyond what is necessary for your research question.
  • Document every source you collect from, including the date of collection.

Phase 2: Feature Engineering

Feature engineering transforms raw text into numerical representations that machine learning models can process. This phase implements three complementary approaches: bag-of-words/TF-IDF representations, sentiment and stylometric features, and source metadata features.

"""
capstone01/feature_engineering.py
Feature extraction for the misinformation detection pipeline.
"""

import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.preprocessing import StandardScaler
import re
import string
from typing import Tuple
import logging
from textblob import TextBlob
import scipy.sparse as sp

logger = logging.getLogger(__name__)


class TextPreprocessor:
    """Clean and normalize raw article text."""

    def __init__(
        self,
        remove_urls: bool = True,
        lowercase: bool = True,
        remove_punctuation: bool = False,  # Keep for stylometric features
    ):
        self.remove_urls = remove_urls
        self.lowercase = lowercase
        self.remove_punctuation = remove_punctuation

    def clean(self, text: str) -> str:
        if not isinstance(text, str):
            return ""

        if self.remove_urls:
            text = re.sub(r"http\S+|www\S+", " URL ", text)

        # Normalize whitespace
        text = re.sub(r"\s+", " ", text).strip()

        if self.lowercase:
            text = text.lower()

        if self.remove_punctuation:
            text = text.translate(str.maketrans("", "", string.punctuation))

        return text

    def fit_transform(self, texts: pd.Series) -> pd.Series:
        return texts.apply(self.clean)


class TFIDFFeatureExtractor:
    """Extract TF-IDF features from text."""

    def __init__(
        self,
        max_features: int = 50000,
        ngram_range: Tuple[int, int] = (1, 2),
        min_df: int = 3,
        sublinear_tf: bool = True,
    ):
        self.vectorizer = TfidfVectorizer(
            max_features=max_features,
            ngram_range=ngram_range,
            min_df=min_df,
            sublinear_tf=sublinear_tf,
            strip_accents="unicode",
            analyzer="word",
            token_pattern=r"(?u)\b[a-zA-Z]{2,}\b",
        )

    def fit(self, texts: pd.Series) -> "TFIDFFeatureExtractor":
        self.vectorizer.fit(texts)
        return self

    def transform(self, texts: pd.Series):
        return self.vectorizer.transform(texts)

    def fit_transform(self, texts: pd.Series):
        return self.vectorizer.fit_transform(texts)

    def get_feature_names(self) -> list:
        return self.vectorizer.get_feature_names_out().tolist()


class StylometricFeatureExtractor:
    """
    Extract stylometric features — writing style characteristics that
    research has shown to differ systematically between reliable and
    unreliable content.
    """

    EXCLAMATION_RE = re.compile(r"!")
    QUESTION_RE = re.compile(r"\?")
    CAPS_WORD_RE = re.compile(r"\b[A-Z]{2,}\b")
    QUOTE_RE = re.compile(r'"[^"]*"')
    NUMBER_RE = re.compile(r"\b\d+\.?\d*\b")
    ELLIPSIS_RE = re.compile(r"\.\.\.")

    # Hedging language (associated with reliable/scientific writing)
    HEDGE_WORDS = {
        "approximately", "roughly", "about", "around", "nearly", "almost",
        "suggest", "indicate", "appear", "seem", "may", "might", "could",
        "possibly", "perhaps", "likely", "probably", "generally", "often",
        "tend", "evidence", "research", "study", "studies", "according"
    }

    # Certainty/absolute language (sometimes associated with unreliable content)
    CERTAINTY_WORDS = {
        "always", "never", "everyone", "nobody", "all", "none", "definitely",
        "certainly", "absolutely", "guaranteed", "proven", "fact", "truth",
        "clearly", "obviously", "undeniably", "undoubtedly", "100%"
    }

    # Clickbait/sensationalist words
    CLICKBAIT_WORDS = {
        "shocking", "bombshell", "explosive", "breaking", "exposed",
        "secret", "hidden", "leaked", "they don't want you to know",
        "doctors hate", "this one weird", "unbelievable", "mind-blowing",
        "you won't believe"
    }

    def extract(self, text: str) -> dict:
        """Extract stylometric features from a single text."""
        if not text or not isinstance(text, str):
            return self._empty_features()

        words = text.split()
        sentences = re.split(r"[.!?]+", text)
        sentences = [s.strip() for s in sentences if s.strip()]

        word_count = len(words)
        sentence_count = max(len(sentences), 1)
        char_count = len(text)

        # Word-level features
        unique_words = set(w.lower() for w in words)
        avg_word_length = (
            np.mean([len(w) for w in words]) if words else 0
        )
        type_token_ratio = len(unique_words) / max(word_count, 1)

        # Sentence-level features
        avg_sentence_length = word_count / sentence_count

        # Punctuation and typography features
        exclamation_count = len(self.EXCLAMATION_RE.findall(text))
        question_count = len(self.QUESTION_RE.findall(text))
        caps_word_count = len(self.CAPS_WORD_RE.findall(text))
        quote_count = len(self.QUOTE_RE.findall(text))
        number_count = len(self.NUMBER_RE.findall(text))
        ellipsis_count = len(self.ELLIPSIS_RE.findall(text))

        # Lexical category features (normalized by word count)
        text_lower = text.lower()
        words_lower = [w.lower() for w in words]
        words_lower_set = set(words_lower)

        hedge_ratio = len(self.HEDGE_WORDS & words_lower_set) / max(word_count, 1)
        certainty_ratio = len(self.CERTAINTY_WORDS & words_lower_set) / max(word_count, 1)
        clickbait_count = sum(
            1 for phrase in self.CLICKBAIT_WORDS if phrase in text_lower
        )

        # Sentiment features via TextBlob
        try:
            blob = TextBlob(text[:5000])  # Limit for performance
            sentiment_polarity = blob.sentiment.polarity
            sentiment_subjectivity = blob.sentiment.subjectivity
        except Exception:
            sentiment_polarity = 0.0
            sentiment_subjectivity = 0.0

        return {
            "word_count": word_count,
            "sentence_count": sentence_count,
            "char_count": char_count,
            "avg_word_length": avg_word_length,
            "avg_sentence_length": avg_sentence_length,
            "type_token_ratio": type_token_ratio,
            "exclamation_per_sentence": exclamation_count / sentence_count,
            "question_per_sentence": question_count / sentence_count,
            "caps_words_per_100": (caps_word_count / max(word_count, 1)) * 100,
            "quote_density": quote_count / max(word_count, 1),
            "number_density": number_count / max(word_count, 1),
            "ellipsis_count": ellipsis_count,
            "hedge_ratio": hedge_ratio,
            "certainty_ratio": certainty_ratio,
            "clickbait_count": clickbait_count,
            "sentiment_polarity": sentiment_polarity,
            "sentiment_subjectivity": sentiment_subjectivity,
        }

    def _empty_features(self) -> dict:
        return {k: 0.0 for k in [
            "word_count", "sentence_count", "char_count",
            "avg_word_length", "avg_sentence_length", "type_token_ratio",
            "exclamation_per_sentence", "question_per_sentence",
            "caps_words_per_100", "quote_density", "number_density",
            "ellipsis_count", "hedge_ratio", "certainty_ratio",
            "clickbait_count", "sentiment_polarity", "sentiment_subjectivity"
        ]}

    def transform(self, texts: pd.Series) -> pd.DataFrame:
        features = texts.apply(self.extract)
        return pd.DataFrame(features.tolist())


class SourceMetadataExtractor:
    """
    Extract features derived from article source metadata.
    These features are strong predictors but raise ethical concerns
    about source-level blacklisting (discussed in Phase 5).
    """

    def __init__(self, source_ratings: Optional[dict] = None):
        """
        Args:
            source_ratings: Dict mapping domain -> credibility score (0-1).
                           If None, domain-level features are still extracted
                           but no external ratings are applied.
        """
        self.source_ratings = source_ratings or {}

    def extract_domain(self, url: str) -> str:
        """Extract the base domain from a URL."""
        if not url or not isinstance(url, str):
            return "unknown"
        try:
            from urllib.parse import urlparse
            parsed = urlparse(url)
            domain = parsed.netloc.lower()
            # Remove www. prefix
            if domain.startswith("www."):
                domain = domain[4:]
            return domain
        except Exception:
            return "unknown"

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Extract source metadata features from a DataFrame with
        'source' (URL) and 'source_name' columns.
        """
        features = pd.DataFrame()

        if "source" in df.columns:
            features["domain"] = df["source"].apply(self.extract_domain)
            if self.source_ratings:
                features["source_credibility_score"] = (
                    features["domain"].map(self.source_ratings).fillna(0.5)
                )
        else:
            features["domain"] = "unknown"
            features["source_credibility_score"] = 0.5

        # Has byline (approximated from source_name being non-generic)
        if "source_name" in df.columns:
            features["has_named_source"] = (
                df["source_name"].notna()
                & (df["source_name"].str.len() > 2)
                & (~df["source_name"].isin(["unknown", "N/A", ""]))
            ).astype(int)
        else:
            features["has_named_source"] = 0

        return features


def build_feature_matrix(
    df: pd.DataFrame,
    tfidf_extractor: TFIDFFeatureExtractor,
    fit_tfidf: bool = True,
) -> Tuple[sp.csr_matrix, np.ndarray]:
    """
    Build the complete feature matrix combining TF-IDF, stylometric,
    and source metadata features.

    Args:
        df: DataFrame with 'text', 'source', 'source_name' columns
        tfidf_extractor: TFIDFFeatureExtractor instance
        fit_tfidf: Whether to fit the TF-IDF vectorizer (True for train set)

    Returns:
        Tuple of (feature_matrix, labels)
    """
    preprocessor = TextPreprocessor()
    clean_texts = preprocessor.fit_transform(df["text"])

    # TF-IDF features (sparse)
    if fit_tfidf:
        tfidf_features = tfidf_extractor.fit_transform(clean_texts)
    else:
        tfidf_features = tfidf_extractor.transform(clean_texts)

    # Stylometric features (dense)
    stylo_extractor = StylometricFeatureExtractor()
    stylo_features = stylo_extractor.transform(df["text"])

    # Scale stylometric features
    scaler = StandardScaler()
    if fit_tfidf:
        stylo_scaled = scaler.fit_transform(stylo_features)
    else:
        stylo_scaled = scaler.transform(stylo_features)

    stylo_sparse = sp.csr_matrix(stylo_scaled)

    # Source metadata features
    meta_extractor = SourceMetadataExtractor()
    meta_features = meta_extractor.transform(df)
    # Use only numeric columns
    meta_numeric = meta_features.select_dtypes(include=[np.number])
    meta_sparse = sp.csr_matrix(meta_numeric.values)

    # Combine all features
    feature_matrix = sp.hstack([tfidf_features, stylo_sparse, meta_sparse])

    labels = df["label"].values

    logger.info(
        f"Feature matrix shape: {feature_matrix.shape}, "
        f"Label distribution: {np.bincount(labels)}"
    )

    return feature_matrix, labels

Phase 3: Model Development

This phase trains three models of increasing complexity: a logistic regression baseline, a random forest ensemble, and a fine-tuned BERT classifier.

"""
capstone01/models.py
Model training and management for the misinformation detection pipeline.
"""

import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_validate
import scipy.sparse as sp
import logging
import json
import pickle
from pathlib import Path
from typing import Optional
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
)
import evaluate

logger = logging.getLogger(__name__)


# ============================================================
# Model 1: Logistic Regression Baseline
# ============================================================

class LogisticRegressionClassifier:
    """
    L2-regularized logistic regression on TF-IDF + stylometric features.
    This is the baseline model — simple, interpretable, and surprisingly competitive.
    """

    def __init__(
        self,
        C: float = 1.0,
        max_iter: int = 1000,
        class_weight: str = "balanced",
        random_state: int = 42,
    ):
        self.model = LogisticRegression(
            C=C,
            max_iter=max_iter,
            class_weight=class_weight,
            random_state=random_state,
            solver="lbfgs",
            n_jobs=-1,
        )
        self.is_fitted = False

    def fit(self, X, y):
        self.model.fit(X, y)
        self.is_fitted = True
        return self

    def predict(self, X):
        return self.model.predict(X)

    def predict_proba(self, X):
        return self.model.predict_proba(X)

    def get_top_features(
        self,
        feature_names: list,
        n: int = 20
    ) -> dict:
        """
        Return the most informative features for each class.
        This is a key interpretability tool for logistic regression.
        """
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted first")

        coef = self.model.coef_[0]
        top_reliable_indices = np.argsort(coef)[-n:][::-1]
        top_unreliable_indices = np.argsort(coef)[:n]

        return {
            "top_reliable_features": [
                (feature_names[i], float(coef[i]))
                for i in top_reliable_indices
                if i < len(feature_names)
            ],
            "top_unreliable_features": [
                (feature_names[i], float(coef[i]))
                for i in top_unreliable_indices
                if i < len(feature_names)
            ],
        }

    def save(self, path: Path):
        with open(path, "wb") as f:
            pickle.dump(self.model, f)

    @classmethod
    def load(cls, path: Path) -> "LogisticRegressionClassifier":
        instance = cls()
        with open(path, "rb") as f:
            instance.model = pickle.load(f)
        instance.is_fitted = True
        return instance


# ============================================================
# Model 2: Random Forest
# ============================================================

class RandomForestMisinformationClassifier:
    """
    Random forest classifier. Less interpretable than logistic regression
    but often captures non-linear feature interactions.
    """

    def __init__(
        self,
        n_estimators: int = 200,
        max_depth: Optional[int] = None,
        min_samples_leaf: int = 5,
        class_weight: str = "balanced",
        random_state: int = 42,
        n_jobs: int = -1,
    ):
        self.model = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            min_samples_leaf=min_samples_leaf,
            class_weight=class_weight,
            random_state=random_state,
            n_jobs=n_jobs,
        )
        self.is_fitted = False

    def fit(self, X, y):
        # Random forest requires dense input
        if sp.issparse(X):
            X = X.toarray()
        self.model.fit(X, y)
        self.is_fitted = True
        return self

    def predict(self, X):
        if sp.issparse(X):
            X = X.toarray()
        return self.model.predict(X)

    def predict_proba(self, X):
        if sp.issparse(X):
            X = X.toarray()
        return self.model.predict_proba(X)

    def get_feature_importances(
        self,
        feature_names: list,
        n: int = 30
    ) -> list:
        importances = self.model.feature_importances_
        indices = np.argsort(importances)[-n:][::-1]
        return [
            (feature_names[i], float(importances[i]))
            for i in indices
            if i < len(feature_names)
        ]

    def save(self, path: Path):
        with open(path, "wb") as f:
            pickle.dump(self.model, f)


# ============================================================
# Model 3: Fine-tuned BERT
# ============================================================

class NewsDataset(Dataset):
    """PyTorch Dataset for tokenized news articles."""

    def __init__(
        self,
        texts: list,
        labels: list,
        tokenizer,
        max_length: int = 512,
    ):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoding = self.tokenizer(
            self.texts[idx],
            truncation=True,
            max_length=self.max_length,
            padding="max_length",
            return_tensors="pt",
        )
        return {
            "input_ids": encoding["input_ids"].squeeze(),
            "attention_mask": encoding["attention_mask"].squeeze(),
            "labels": torch.tensor(self.labels[idx], dtype=torch.long),
        }


def compute_metrics(eval_pred):
    """Compute metrics for HuggingFace Trainer."""
    accuracy_metric = evaluate.load("accuracy")
    f1_metric = evaluate.load("f1")

    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    accuracy = accuracy_metric.compute(
        predictions=predictions, references=labels
    )
    f1 = f1_metric.compute(
        predictions=predictions, references=labels, average="weighted"
    )
    return {**accuracy, **f1}


class BERTMisinformationClassifier:
    """
    Fine-tuned BERT classifier for misinformation detection.

    Uses a pretrained bert-base-uncased model with a classification head.
    For production use, consider distilbert-base-uncased for lower latency.
    """

    def __init__(
        self,
        model_name: str = "bert-base-uncased",
        num_labels: int = 2,
        output_dir: str = "models/bert_classifier",
        learning_rate: float = 2e-5,
        num_epochs: int = 3,
        batch_size: int = 16,
        warmup_ratio: float = 0.1,
        weight_decay: float = 0.01,
    ):
        self.model_name = model_name
        self.num_labels = num_labels
        self.output_dir = output_dir
        self.learning_rate = learning_rate
        self.num_epochs = num_epochs
        self.batch_size = batch_size
        self.warmup_ratio = warmup_ratio
        self.weight_decay = weight_decay

        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(
            model_name,
            num_labels=num_labels,
        )

    def fine_tune(
        self,
        train_texts: list,
        train_labels: list,
        val_texts: list,
        val_labels: list,
    ):
        """Fine-tune BERT on the training data."""

        train_dataset = NewsDataset(
            train_texts, train_labels, self.tokenizer
        )
        val_dataset = NewsDataset(val_texts, val_labels, self.tokenizer)

        training_args = TrainingArguments(
            output_dir=self.output_dir,
            num_train_epochs=self.num_epochs,
            per_device_train_batch_size=self.batch_size,
            per_device_eval_batch_size=self.batch_size,
            learning_rate=self.learning_rate,
            warmup_ratio=self.warmup_ratio,
            weight_decay=self.weight_decay,
            evaluation_strategy="epoch",
            save_strategy="epoch",
            load_best_model_at_end=True,
            metric_for_best_model="f1",
            report_to="none",  # Disable W&B logging for student use
            logging_dir=f"{self.output_dir}/logs",
            logging_steps=50,
        )

        trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            compute_metrics=compute_metrics,
        )

        logger.info("Starting BERT fine-tuning...")
        trainer.train()
        logger.info("Fine-tuning complete.")
        self.trainer = trainer
        return trainer

    def predict(self, texts: list) -> np.ndarray:
        """Generate predictions for a list of texts."""
        self.model.eval()
        all_preds = []

        dataset = NewsDataset(
            texts,
            [0] * len(texts),  # Dummy labels
            self.tokenizer
        )
        loader = DataLoader(dataset, batch_size=32, shuffle=False)

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(device)

        with torch.no_grad():
            for batch in loader:
                input_ids = batch["input_ids"].to(device)
                attention_mask = batch["attention_mask"].to(device)
                outputs = self.model(
                    input_ids=input_ids,
                    attention_mask=attention_mask
                )
                preds = torch.argmax(outputs.logits, dim=-1)
                all_preds.extend(preds.cpu().numpy())

        return np.array(all_preds)

    def predict_proba(self, texts: list) -> np.ndarray:
        """Generate probability estimates."""
        self.model.eval()
        all_probs = []

        dataset = NewsDataset(
            texts,
            [0] * len(texts),
            self.tokenizer
        )
        loader = DataLoader(dataset, batch_size=32, shuffle=False)

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(device)

        with torch.no_grad():
            for batch in loader:
                input_ids = batch["input_ids"].to(device)
                attention_mask = batch["attention_mask"].to(device)
                outputs = self.model(
                    input_ids=input_ids,
                    attention_mask=attention_mask
                )
                probs = torch.softmax(outputs.logits, dim=-1)
                all_probs.extend(probs.cpu().numpy())

        return np.array(all_probs)

Phase 4: Evaluation

"""
capstone01/evaluation.py
Comprehensive evaluation of misinformation detection models.
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import seaborn as sns
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    roc_auc_score,
    roc_curve,
    precision_recall_curve,
    average_precision_score,
    ConfusionMatrixDisplay,
)
from sklearn.model_selection import StratifiedKFold, cross_validate
import logging
from pathlib import Path
from typing import Optional

logger = logging.getLogger(__name__)

plt.style.use("seaborn-v0_8-whitegrid")
FIGURE_DIR = Path("figures")
FIGURE_DIR.mkdir(exist_ok=True)


def evaluate_classifier(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    y_prob: Optional[np.ndarray],
    model_name: str,
) -> dict:
    """
    Comprehensive evaluation of a binary classifier.

    Args:
        y_true: True labels
        y_pred: Predicted labels
        y_prob: Predicted probabilities for positive class (optional)
        model_name: Name for display purposes

    Returns:
        Dict of evaluation metrics
    """
    report = classification_report(
        y_true, y_pred,
        target_names=["Unreliable", "Reliable"],
        output_dict=True
    )

    metrics = {
        "model_name": model_name,
        "accuracy": report["accuracy"],
        "precision_reliable": report["Reliable"]["precision"],
        "recall_reliable": report["Reliable"]["recall"],
        "f1_reliable": report["Reliable"]["f1-score"],
        "precision_unreliable": report["Unreliable"]["precision"],
        "recall_unreliable": report["Unreliable"]["recall"],
        "f1_unreliable": report["Unreliable"]["f1-score"],
        "macro_f1": report["macro avg"]["f1-score"],
        "weighted_f1": report["weighted avg"]["f1-score"],
    }

    if y_prob is not None:
        metrics["roc_auc"] = roc_auc_score(y_true, y_prob)
        metrics["avg_precision"] = average_precision_score(y_true, y_prob)

    logger.info(f"\n{'='*60}")
    logger.info(f"Evaluation Results: {model_name}")
    logger.info(f"{'='*60}")
    logger.info(classification_report(
        y_true, y_pred, target_names=["Unreliable", "Reliable"]
    ))
    if y_prob is not None:
        logger.info(f"ROC-AUC: {metrics['roc_auc']:.4f}")

    return metrics


def plot_confusion_matrix(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    model_name: str,
    save: bool = True,
):
    """Plot and optionally save a confusion matrix."""
    fig, ax = plt.subplots(figsize=(6, 5))
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(
        confusion_matrix=cm,
        display_labels=["Unreliable", "Reliable"]
    )
    disp.plot(ax=ax, colorbar=False, cmap="Blues")
    ax.set_title(f"Confusion Matrix: {model_name}", fontsize=13, fontweight="bold")
    plt.tight_layout()

    if save:
        path = FIGURE_DIR / f"confusion_matrix_{model_name.lower().replace(' ', '_')}.png"
        plt.savefig(path, dpi=150, bbox_inches="tight")
        logger.info(f"Saved confusion matrix to {path}")
    plt.show()


def plot_roc_curves(
    models_data: list[dict],
    save: bool = True,
):
    """
    Plot ROC curves for multiple models on a single figure.

    Args:
        models_data: List of dicts with keys 'name', 'y_true', 'y_prob'
    """
    fig, ax = plt.subplots(figsize=(8, 6))

    colors = ["#2196F3", "#F44336", "#4CAF50", "#FF9800"]

    for i, data in enumerate(models_data):
        fpr, tpr, _ = roc_curve(data["y_true"], data["y_prob"])
        auc = roc_auc_score(data["y_true"], data["y_prob"])
        ax.plot(
            fpr, tpr,
            color=colors[i % len(colors)],
            lw=2,
            label=f"{data['name']} (AUC = {auc:.3f})"
        )

    ax.plot([0, 1], [0, 1], "k--", lw=1, label="Random classifier")
    ax.set_xlim([0.0, 1.0])
    ax.set_ylim([0.0, 1.05])
    ax.set_xlabel("False Positive Rate", fontsize=12)
    ax.set_ylabel("True Positive Rate", fontsize=12)
    ax.set_title("ROC Curves: Model Comparison", fontsize=14, fontweight="bold")
    ax.legend(loc="lower right", fontsize=10)

    plt.tight_layout()
    if save:
        path = FIGURE_DIR / "roc_curves_comparison.png"
        plt.savefig(path, dpi=150, bbox_inches="tight")
        logger.info(f"Saved ROC curves to {path}")
    plt.show()


def error_analysis(
    df: pd.DataFrame,
    y_true: np.ndarray,
    y_pred: np.ndarray,
    y_prob: Optional[np.ndarray] = None,
    n_examples: int = 20,
) -> dict:
    """
    Perform structured error analysis on misclassified examples.

    Args:
        df: DataFrame with 'text' and other columns
        y_true: True labels
        y_pred: Predicted labels
        y_prob: Predicted probabilities (optional)
        n_examples: Number of error examples to examine

    Returns:
        Dict containing false positive and false negative examples
    """
    results = {}

    # False positives: predicted reliable (1), actually unreliable (0)
    fp_mask = (y_pred == 1) & (y_true == 0)
    fp_indices = np.where(fp_mask)[0]

    # False negatives: predicted unreliable (0), actually reliable (1)
    fn_mask = (y_pred == 0) & (y_true == 1)
    fn_indices = np.where(fn_mask)[0]

    logger.info(f"\nError Analysis Summary:")
    logger.info(f"  False Positives (unreliable predicted as reliable): {fp_mask.sum()}")
    logger.info(f"  False Negatives (reliable predicted as unreliable): {fn_mask.sum()}")
    logger.info(f"  FP Rate: {fp_mask.sum() / max((y_true == 0).sum(), 1):.3f}")
    logger.info(f"  FN Rate: {fn_mask.sum() / max((y_true == 1).sum(), 1):.3f}")

    # Sample worst errors by confidence (most confident wrong predictions)
    if y_prob is not None:
        if len(fp_indices) > 0:
            fp_probs = y_prob[fp_indices]
            fp_worst = fp_indices[np.argsort(fp_probs)[-n_examples:][::-1]]
            results["false_positives"] = df.iloc[fp_worst][["text"]].copy()
            results["false_positives"]["confidence"] = y_prob[fp_worst]

        if len(fn_indices) > 0:
            fn_probs = 1 - y_prob[fn_indices]  # Confidence in wrong prediction
            fn_worst = fn_indices[np.argsort(fn_probs)[-n_examples:][::-1]]
            results["false_negatives"] = df.iloc[fn_worst][["text"]].copy()
            results["false_negatives"]["confidence"] = fn_probs[np.argsort(fn_probs)[-n_examples:][::-1]]

    return results


def cross_validate_model(
    model,
    X,
    y: np.ndarray,
    cv: int = 5,
    scoring: list = None,
) -> pd.DataFrame:
    """
    Perform stratified k-fold cross-validation.

    Returns a DataFrame of per-fold scores.
    """
    if scoring is None:
        scoring = ["accuracy", "f1_weighted", "roc_auc"]

    skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=42)

    cv_results = cross_validate(
        model.model,
        X,
        y,
        cv=skf,
        scoring=scoring,
        return_train_score=True,
        n_jobs=-1,
    )

    results_df = pd.DataFrame(cv_results)
    logger.info(f"\nCross-validation results ({cv} folds):")
    for metric in scoring:
        test_key = f"test_{metric}"
        if test_key in results_df.columns:
            scores = results_df[test_key]
            logger.info(
                f"  {metric}: {scores.mean():.4f} (+/- {scores.std():.4f})"
            )

    return results_df


def compare_models(metrics_list: list[dict]) -> pd.DataFrame:
    """Create a comparison table from a list of evaluation metric dicts."""
    comparison = pd.DataFrame(metrics_list)
    comparison = comparison.set_index("model_name")

    display_cols = [
        "accuracy", "macro_f1", "weighted_f1",
        "roc_auc", "precision_reliable", "recall_reliable",
        "precision_unreliable", "recall_unreliable"
    ]
    display_cols = [c for c in display_cols if c in comparison.columns]

    logger.info("\nModel Comparison Table:")
    logger.info(comparison[display_cols].to_string())

    return comparison[display_cols]

Phase 5: Deployment Considerations

5.1 API Wrapper

"""
capstone01/api.py
Simple FastAPI wrapper for the misinformation detection pipeline.
For demonstration and educational purposes only.
"""

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import numpy as np
from pathlib import Path
import logging

logger = logging.getLogger(__name__)

app = FastAPI(
    title="Misinformation Detection API",
    description=(
        "An educational API for demonstrating misinformation detection. "
        "NOT suitable for production content moderation without extensive "
        "additional validation and human review processes."
    ),
    version="0.1.0",
)


class ArticleRequest(BaseModel):
    text: str
    source_url: str = ""
    source_name: str = ""


class DetectionResult(BaseModel):
    text_preview: str
    predicted_label: str
    confidence: float
    reliability_probability: float
    warning: str
    feature_highlights: dict


# Global model objects (loaded at startup)
tfidf_extractor = None
lr_classifier = None


@app.on_event("startup")
async def load_models():
    """Load models at API startup."""
    global tfidf_extractor, lr_classifier
    # In a real deployment, load from serialized model files
    # tfidf_extractor = TFIDFFeatureExtractor.load(Path("models/tfidf.pkl"))
    # lr_classifier = LogisticRegressionClassifier.load(Path("models/lr.pkl"))
    logger.info("Models loaded at startup")


@app.post("/detect", response_model=DetectionResult)
async def detect_misinformation(request: ArticleRequest):
    """
    Classify an article as reliable or potentially unreliable.

    IMPORTANT LIMITATIONS (read before using):
    1. This model was trained on specific datasets and may not generalize.
    2. High confidence does not mean the model is correct.
    3. Results should ALWAYS be reviewed by a human before acting on them.
    4. The model reflects biases present in its training data.
    5. Satire, opinion, and creative writing may be misclassified.
    """

    if not request.text or len(request.text) < 50:
        raise HTTPException(
            status_code=400,
            detail="Text must be at least 50 characters."
        )

    WARNING_TEXT = (
        "This prediction is generated by an automated system and has known "
        "limitations. Do not use this result as the sole basis for any "
        "consequential decision about content or its producers."
    )

    # Placeholder for actual model inference
    # In a complete implementation, run the feature extraction and prediction pipeline
    result = DetectionResult(
        text_preview=request.text[:200] + "...",
        predicted_label="reliable",   # Placeholder
        confidence=0.0,               # Placeholder
        reliability_probability=0.0,  # Placeholder
        warning=WARNING_TEXT,
        feature_highlights={},
    )

    return result


@app.get("/model-info")
async def model_info():
    """Return information about the deployed model."""
    return {
        "model_type": "Logistic Regression on TF-IDF + Stylometric Features",
        "training_data": "LIAR dataset + supplementary scraped articles",
        "known_limitations": [
            "Performance degrades on topics not well-represented in training data",
            "Cannot verify factual claims — classifies writing style, not truth",
            "Unreliable sources that adopt reliable stylistic conventions may evade detection",
            "Reliable sources that use informal or emotionally expressive language may be flagged",
            "Model reflects demographic and linguistic biases in training data",
        ],
        "recommended_use": "Research, education, and human-assisted review only",
        "not_recommended_for": (
            "Automated content removal, account suspension, or any consequential "
            "action without human review"
        ),
    }

5.2 Ethical Safeguards Discussion

Your written report must include a section addressing the following ethical dimensions:

Accuracy asymmetry: A model with 85% overall accuracy will still misclassify 15% of articles — in a large-scale deployment, that represents millions of pieces of content. False positives (reliable content flagged as unreliable) and false negatives (unreliable content allowed through) have different costs in different contexts. Who bears the costs of each error type?

Training data bias: Your training data reflects the specific sources you chose to label as reliable or unreliable. Consider: Does your data include sources from countries other than the US? Does it include sources that serve minority language communities? Does it include sources with non-mainstream political perspectives that are nonetheless factually accurate? How might biases in your training data manifest in your model's behavior?

The interpretability imperative: When a model flags content as potentially unreliable, what explanation does it provide? Logistic regression coefficients offer some interpretability; BERT provides very little. The less interpretable a model, the harder it is to identify and correct systematic errors.

Adversarial adaptation: Publishing a classifier's feature weights allows sophisticated actors to adapt their content to evade detection. Academic transparency and detection effectiveness are in tension.


Deliverables Checklist

  • [ ] Data collection script and documented dataset (minimum 1,000 examples, preferably 5,000+)
  • [ ] Feature engineering notebook with visualization of feature distributions by label
  • [ ] Trained logistic regression model with interpretability analysis (top features per class)
  • [ ] Trained random forest model with feature importance analysis
  • [ ] Fine-tuned BERT classifier (or justification for substituting DistilBERT if compute-constrained)
  • [ ] Evaluation notebook with confusion matrices, ROC curves, and cross-validation results for all models
  • [ ] Error analysis section examining at least 20 false positives and 20 false negatives
  • [ ] Written report (3,000–5,000 words) covering methodology, results, limitations, and ethical analysis
  • [ ] API wrapper code with documented ethical constraints
  • [ ] Reproducibility: requirements.txt, random seeds set, data loading documented

Grading Rubric

Criterion Description Points
1. Data Quality Dataset is appropriately sized, balanced, documented, and collected ethically. Sources are clearly identified with labeling rationale explained. 0–10
2. Feature Engineering At least three distinct feature types implemented. Feature distributions analyzed and visualized. Preprocessing decisions documented with rationale. 0–10
3. Baseline Model Logistic regression implemented correctly. Interpretability analysis (top features) performed and discussed. 0–10
4. Advanced Models Random forest and BERT (or equivalent transformer) implemented and trained. Hyperparameter choices documented with justification. 0–10
5. Evaluation Rigor Cross-validation performed. Multiple metrics reported (accuracy, F1, AUC). Confusion matrices presented. Results interpreted correctly. 0–10
6. Error Analysis Systematic analysis of false positives and false negatives. Patterns identified and explained. Implications for deployment discussed. 0–10
7. Model Comparison Models compared on consistent evaluation sets. Performance differences explained with reference to model architecture and features. 0–10
8. Ethical Analysis Substantive discussion of bias, false positive/negative trade-offs, adversarial adaptation, and deployment safeguards. Engages with relevant literature. 0–10
9. Code Quality Code is readable, documented, modular, and reproducible. Dependencies specified. Random seeds set. Data loading clearly documented. 0–10
10. Written Report Report is clear, well-organized, and demonstrates genuine engagement with the technical and ethical material. Findings communicated to a non-technical audience effectively. 0–10

Total: 100 points

Grade thresholds: A (90–100), B (80–89), C (70–79), D (60–69), F (below 60)


Getting Started: Environment Setup

# Create and activate virtual environment
python -m venv capstone01-env
source capstone01-env/bin/activate  # On Windows: capstone01-env\Scripts\activate

# Install dependencies
pip install pandas numpy scikit-learn scipy matplotlib seaborn
pip install feedparser requests beautifulsoup4 textblob
pip install torch torchvision torchaudio  # See pytorch.org for CUDA variants
pip install transformers datasets evaluate accelerate
pip install fastapi uvicorn pydantic
pip install jupyter notebook ipykernel
pip install nltk

# Download LIAR dataset
# wget https://www.cs.ucsb.edu/~william/data/liar_dataset.zip
# unzip liar_dataset.zip -d data/liar_dataset/

# Verify CUDA availability (optional but speeds up BERT fine-tuning significantly)
python -c "import torch; print(f'CUDA available: {torch.cuda.is_available()}')"

Note: Fine-tuning BERT requires either a GPU or significant patience. DistilBERT (distilbert-base-uncased) trains approximately 60% faster with modest performance reduction and is an acceptable substitute for students without GPU access.