Case Study 02: Building a Plugin System with Design Patterns

Overview

Context: A data analytics company builds a command-line tool called DataFlow that processes CSV and JSON files. The tool is popular, but users keep requesting support for new data sources (Excel, databases, APIs) and new output formats (charts, PDF reports, Slack notifications). Rather than building every feature in-house, the team decides to create a plugin architecture that lets third-party developers extend the tool.

Challenge: Design and implement an extensible plugin system using design patterns that allows third-party developers to add new functionality without modifying the core codebase.

Patterns Applied: Factory (plugin discovery and instantiation), Observer (event-based plugin communication), Protocol (plugin interfaces), and Facade (simplified plugin API).


Requirements

The plugin system must support:

  1. Discovery: Automatically find plugins in a designated directory.
  2. Lifecycle management: Plugins have initialize, execute, and shutdown phases.
  3. Event-driven communication: Plugins communicate through events, not direct references.
  4. Configuration: Each plugin can declare its configuration requirements.
  5. Isolation: A failing plugin should not crash the entire system.

Designing the Plugin Interface

We start by defining what a plugin looks like using Python's Protocol:

from typing import Protocol, Any, runtime_checkable
from dataclasses import dataclass, field

@dataclass(frozen=True)
class PluginMetadata:
    """Describes a plugin's identity and requirements."""
    name: str
    version: str
    author: str
    description: str
    dependencies: list[str] = field(default_factory=list)
    config_schema: dict[str, type] = field(default_factory=dict)

@runtime_checkable
class Plugin(Protocol):
    """The contract every plugin must satisfy."""

    @property
    def metadata(self) -> PluginMetadata: ...

    def initialize(self, config: dict[str, Any]) -> None:
        """Called once when the plugin is loaded."""
        ...

    def execute(self, context: dict[str, Any]) -> dict[str, Any]:
        """Called during the data processing pipeline."""
        ...

    def shutdown(self) -> None:
        """Called when the plugin is being unloaded."""
        ...

Using @runtime_checkable allows us to verify at load time that a module actually provides a valid plugin. Using Protocol (structural subtyping) means plugin authors never need to import from our core library—they just implement the right methods and properties.


The Event Bus (Observer Pattern)

Plugins need to communicate without knowing about each other. An event bus implements the Observer pattern for decoupled, event-driven communication:

from typing import Callable, Any
from dataclasses import dataclass, field
from datetime import datetime
import logging

logger = logging.getLogger("dataflow.events")

@dataclass(frozen=True)
class Event:
    """Base for all events in the system."""
    source: str
    timestamp: datetime = field(default_factory=datetime.now)

@dataclass(frozen=True)
class DataLoadedEvent(Event):
    """Fired when a data source has been loaded."""
    row_count: int = 0
    columns: tuple[str, ...] = ()

@dataclass(frozen=True)
class DataTransformedEvent(Event):
    """Fired when data has been transformed."""
    transformation: str = ""
    rows_affected: int = 0

@dataclass(frozen=True)
class ErrorEvent(Event):
    """Fired when a plugin encounters an error."""
    error_type: str = ""
    message: str = ""

@dataclass(frozen=True)
class PipelineCompleteEvent(Event):
    """Fired when the entire pipeline finishes."""
    total_rows: int = 0
    duration_seconds: float = 0.0


EventHandler = Callable[[Event], None]


class EventBus:
    """Central event dispatcher using the Observer pattern.

    Plugins subscribe to event types and receive notifications when
    those events are emitted. Handlers are called in registration order.
    """

    def __init__(self):
        self._handlers: dict[type, list[EventHandler]] = {}
        self._global_handlers: list[EventHandler] = []

    def subscribe(self, event_type: type, handler: EventHandler) -> None:
        """Subscribe to a specific event type."""
        self._handlers.setdefault(event_type, []).append(handler)
        logger.debug(f"Handler registered for {event_type.__name__}")

    def subscribe_all(self, handler: EventHandler) -> None:
        """Subscribe to all events."""
        self._global_handlers.append(handler)

    def unsubscribe(self, event_type: type, handler: EventHandler) -> None:
        """Remove a handler for a specific event type."""
        if event_type in self._handlers:
            self._handlers[event_type] = [
                h for h in self._handlers[event_type] if h is not handler
            ]

    def emit(self, event: Event) -> None:
        """Emit an event to all registered handlers.

        Errors in handlers are logged but do not prevent other handlers
        from receiving the event.
        """
        event_type = type(event)
        handlers = self._handlers.get(event_type, []) + self._global_handlers

        for handler in handlers:
            try:
                handler(event)
            except Exception as exc:
                logger.error(
                    f"Handler {handler.__name__} failed for "
                    f"{event_type.__name__}: {exc}"
                )

Key design decisions:

  • Frozen dataclass events prevent handlers from modifying events, ensuring all observers receive the same data.
  • Error isolation in emit() means one broken handler does not prevent others from being notified.
  • Type-based subscription lets plugins subscribe to exactly the events they care about.
  • Global handlers support cross-cutting concerns like logging and metrics.

The Plugin Registry (Factory Pattern)

The registry discovers, validates, and instantiates plugins. It acts as a Factory, mapping plugin names to concrete instances:

import importlib
import importlib.util
from pathlib import Path
from typing import Any

class PluginLoadError(Exception):
    """Raised when a plugin fails to load."""
    pass

class PluginRegistry:
    """Discovers and manages plugin instances using the Factory pattern.

    Plugins are loaded from Python modules in a designated directory.
    Each module must contain a `create_plugin()` factory function that
    returns a Plugin instance.
    """

    def __init__(self):
        self._plugins: dict[str, Plugin] = {}
        self._metadata: dict[str, PluginMetadata] = {}

    def discover(self, plugin_dir: str | Path) -> list[str]:
        """Scan a directory for plugin modules.

        Each .py file in the directory is checked for a `create_plugin`
        function. Valid plugins are registered but not yet initialized.
        """
        plugin_dir = Path(plugin_dir)
        discovered = []

        if not plugin_dir.is_dir():
            logger.warning(f"Plugin directory does not exist: {plugin_dir}")
            return discovered

        for path in sorted(plugin_dir.glob("*.py")):
            if path.name.startswith("_"):
                continue

            try:
                plugin = self._load_module(path)
                self._plugins[plugin.metadata.name] = plugin
                self._metadata[plugin.metadata.name] = plugin.metadata
                discovered.append(plugin.metadata.name)
                logger.info(f"Discovered plugin: {plugin.metadata.name} v{plugin.metadata.version}")
            except PluginLoadError as e:
                logger.error(f"Failed to load {path.name}: {e}")

        return discovered

    def _load_module(self, path: Path) -> Plugin:
        """Load a plugin from a Python module file."""
        spec = importlib.util.spec_from_file_location(path.stem, path)
        if spec is None or spec.loader is None:
            raise PluginLoadError(f"Cannot create module spec for {path}")

        module = importlib.util.module_from_spec(spec)
        try:
            spec.loader.exec_module(module)
        except Exception as e:
            raise PluginLoadError(f"Error executing module {path.name}: {e}")

        factory = getattr(module, "create_plugin", None)
        if factory is None:
            raise PluginLoadError(
                f"Module {path.name} does not have a create_plugin() function"
            )

        plugin = factory()
        if not isinstance(plugin, Plugin):
            raise PluginLoadError(
                f"create_plugin() in {path.name} did not return a valid Plugin"
            )

        return plugin

    def get(self, name: str) -> Plugin | None:
        """Retrieve a registered plugin by name."""
        return self._plugins.get(name)

    def get_all(self) -> list[Plugin]:
        """Return all registered plugins."""
        return list(self._plugins.values())

    def list_plugins(self) -> list[PluginMetadata]:
        """List metadata for all registered plugins."""
        return list(self._metadata.values())

The convention-based approach (looking for create_plugin() in each module) is a lightweight Factory: the function name is the contract, and each module decides how to construct its plugin.


The Plugin Manager (Facade Pattern)

The PluginManager acts as a Facade, providing a simple interface that orchestrates the registry, event bus, and plugin lifecycle:

class PluginManager:
    """High-level facade for the plugin system.

    Coordinates plugin discovery, initialization, execution, and shutdown.
    Application code interacts with this class rather than the underlying
    registry and event bus directly.
    """

    def __init__(self, plugin_dir: str | Path):
        self._plugin_dir = Path(plugin_dir)
        self._registry = PluginRegistry()
        self._event_bus = EventBus()
        self._initialized: set[str] = set()

    @property
    def event_bus(self) -> EventBus:
        """Expose the event bus for application-level subscriptions."""
        return self._event_bus

    def discover_and_initialize(self, config: dict[str, dict[str, Any]] | None = None) -> list[str]:
        """Discover all plugins and initialize them with configuration.

        Args:
            config: A dictionary mapping plugin names to their config dicts.
                    Plugins not in this dict receive an empty config.

        Returns:
            List of successfully initialized plugin names.
        """
        config = config or {}
        discovered = self._registry.discover(self._plugin_dir)
        initialized = []

        for name in discovered:
            plugin = self._registry.get(name)
            if plugin is None:
                continue

            try:
                plugin_config = config.get(name, {})
                plugin.initialize(plugin_config)
                self._initialized.add(name)
                initialized.append(name)
                logger.info(f"Initialized plugin: {name}")
            except Exception as e:
                logger.error(f"Failed to initialize plugin {name}: {e}")
                self._event_bus.emit(ErrorEvent(
                    source=name,
                    error_type="InitializationError",
                    message=str(e),
                ))

        return initialized

    def execute_pipeline(self, context: dict[str, Any]) -> dict[str, Any]:
        """Run all initialized plugins in sequence.

        Each plugin receives the context and can modify it. The updated
        context is passed to the next plugin.
        """
        results = {}

        for name in self._initialized:
            plugin = self._registry.get(name)
            if plugin is None:
                continue

            try:
                result = plugin.execute(context)
                results[name] = result
                context.update(result)  # Pipeline: each plugin enriches context
            except Exception as e:
                logger.error(f"Plugin {name} failed during execution: {e}")
                self._event_bus.emit(ErrorEvent(
                    source=name,
                    error_type="ExecutionError",
                    message=str(e),
                ))
                results[name] = {"error": str(e)}

        return results

    def shutdown_all(self) -> None:
        """Gracefully shut down all initialized plugins."""
        for name in list(self._initialized):
            plugin = self._registry.get(name)
            if plugin is None:
                continue

            try:
                plugin.shutdown()
                self._initialized.discard(name)
                logger.info(f"Shut down plugin: {name}")
            except Exception as e:
                logger.error(f"Error shutting down plugin {name}: {e}")

    def list_plugins(self) -> list[dict[str, str]]:
        """Return a summary of all discovered plugins."""
        return [
            {
                "name": m.name,
                "version": m.version,
                "author": m.author,
                "description": m.description,
                "status": "initialized" if m.name in self._initialized else "discovered",
            }
            for m in self._registry.list_plugins()
        ]

Example Plugins

CSV Loader Plugin

# plugins/csv_loader.py
import csv
from dataclasses import dataclass, field
from typing import Any

@dataclass
class CSVLoaderPlugin:
    """Plugin that loads data from CSV files."""

    _metadata: object = field(init=False)
    _delimiter: str = ","
    _encoding: str = "utf-8"

    def __post_init__(self):
        from plugin_types import PluginMetadata
        self._metadata = PluginMetadata(
            name="csv_loader",
            version="1.0.0",
            author="DataFlow Team",
            description="Loads data from CSV files into the pipeline.",
            config_schema={"delimiter": str, "encoding": str},
        )

    @property
    def metadata(self):
        return self._metadata

    def initialize(self, config: dict[str, Any]) -> None:
        self._delimiter = config.get("delimiter", ",")
        self._encoding = config.get("encoding", "utf-8")

    def execute(self, context: dict[str, Any]) -> dict[str, Any]:
        filepath = context.get("input_file", "")
        if not filepath.endswith(".csv"):
            return {}

        with open(filepath, encoding=self._encoding) as f:
            reader = csv.DictReader(f, delimiter=self._delimiter)
            data = list(reader)

        return {"data": data, "row_count": len(data), "source": "csv"}

    def shutdown(self) -> None:
        pass

def create_plugin():
    return CSVLoaderPlugin()

Statistics Plugin

# plugins/statistics.py
from dataclasses import dataclass, field
from typing import Any

@dataclass
class StatisticsPlugin:
    """Plugin that computes basic statistics on numeric columns."""

    _metadata: object = field(init=False)
    _columns: list[str] = field(default_factory=list)

    def __post_init__(self):
        from plugin_types import PluginMetadata
        self._metadata = PluginMetadata(
            name="statistics",
            version="1.0.0",
            author="DataFlow Team",
            description="Computes mean, min, max for numeric columns.",
        )

    @property
    def metadata(self):
        return self._metadata

    def initialize(self, config: dict[str, Any]) -> None:
        self._columns = config.get("columns", [])

    def execute(self, context: dict[str, Any]) -> dict[str, Any]:
        data = context.get("data", [])
        if not data:
            return {"statistics": {}}

        columns = self._columns or self._detect_numeric_columns(data)
        stats = {}

        for col in columns:
            values = []
            for row in data:
                try:
                    values.append(float(row[col]))
                except (KeyError, ValueError, TypeError):
                    continue

            if values:
                stats[col] = {
                    "count": len(values),
                    "mean": sum(values) / len(values),
                    "min": min(values),
                    "max": max(values),
                }

        return {"statistics": stats}

    def _detect_numeric_columns(self, data: list[dict]) -> list[str]:
        if not data:
            return []
        numeric = []
        for key, value in data[0].items():
            try:
                float(value)
                numeric.append(key)
            except (ValueError, TypeError):
                pass
        return numeric

    def shutdown(self) -> None:
        pass

def create_plugin():
    return StatisticsPlugin()

Notification Plugin (Using the Event Bus)

# plugins/slack_notifier.py
from dataclasses import dataclass, field
from typing import Any

@dataclass
class SlackNotifierPlugin:
    """Plugin that sends Slack notifications on pipeline events."""

    _metadata: object = field(init=False)
    _webhook_url: str = ""
    _channel: str = "#data-alerts"

    def __post_init__(self):
        from plugin_types import PluginMetadata
        self._metadata = PluginMetadata(
            name="slack_notifier",
            version="1.0.0",
            author="DataFlow Team",
            description="Sends pipeline notifications to Slack.",
            config_schema={"webhook_url": str, "channel": str},
        )

    @property
    def metadata(self):
        return self._metadata

    def initialize(self, config: dict[str, Any]) -> None:
        self._webhook_url = config.get("webhook_url", "")
        self._channel = config.get("channel", "#data-alerts")

        if not self._webhook_url:
            raise ValueError("slack_notifier requires 'webhook_url' in config")

    def execute(self, context: dict[str, Any]) -> dict[str, Any]:
        row_count = context.get("row_count", 0)
        source = context.get("source", "unknown")
        message = f"Pipeline processed {row_count} rows from {source}"

        self._send_slack_message(message)
        return {"notification_sent": True}

    def _send_slack_message(self, message: str) -> None:
        """Send a message to Slack via webhook."""
        import json
        import urllib.request

        payload = json.dumps({
            "channel": self._channel,
            "text": message,
        }).encode("utf-8")

        req = urllib.request.Request(
            self._webhook_url,
            data=payload,
            headers={"Content-Type": "application/json"},
        )

        try:
            with urllib.request.urlopen(req) as response:
                if response.status != 200:
                    print(f"Slack notification failed: {response.status}")
        except Exception as e:
            print(f"Slack notification error: {e}")

    def shutdown(self) -> None:
        pass

def create_plugin():
    return SlackNotifierPlugin()

Putting It All Together

The application entry point uses the PluginManager facade:

def main():
    manager = PluginManager(plugin_dir="./plugins")

    # Subscribe to error events at the application level
    manager.event_bus.subscribe_all(
        lambda event: print(f"[EVENT] {type(event).__name__}: {event}")
    )

    # Discover and initialize with per-plugin configuration
    config = {
        "csv_loader": {"delimiter": ",", "encoding": "utf-8"},
        "statistics": {"columns": ["revenue", "units_sold"]},
        "slack_notifier": {"webhook_url": "https://hooks.slack.com/..."},
    }

    initialized = manager.discover_and_initialize(config)
    print(f"Initialized plugins: {initialized}")

    # Run the pipeline
    context = {"input_file": "sales_data.csv"}
    results = manager.execute_pipeline(context)

    for plugin_name, result in results.items():
        print(f"  {plugin_name}: {result}")

    # Clean shutdown
    manager.shutdown_all()

Analysis: How the Patterns Interact

The four patterns in this system play distinct, complementary roles:

  1. Protocol defines the contract. Plugin authors know exactly what methods to implement. The core system validates conformance at load time using isinstance() with @runtime_checkable.

  2. Factory (the registry's discover and _load_module methods) handles the messy business of finding, loading, and instantiating plugins. Application code never calls importlib directly.

  3. Observer (the event bus) provides the communication backbone. Plugins can react to system events and each other's actions without importing each other. The Slack notifier does not know about the CSV loader—it just reacts to pipeline events.

  4. Facade (the PluginManager) hides the orchestration complexity. Application code calls discover_and_initialize() and execute_pipeline() without worrying about load order, error handling, or lifecycle management.

Extensibility in Practice

When a user wants to add Excel support, they:

  1. Create plugins/excel_loader.py
  2. Implement the Plugin protocol
  3. Define a create_plugin() function
  4. Drop the file in the plugins directory

No core code changes. No recompilation. No coordination with the DataFlow team. This is the power of a well-designed plugin architecture.

Trade-offs

  • Complexity: The plugin system itself is approximately 300 lines. For an application with only 2–3 fixed features, this is overkill.
  • Debugging: When a plugin misbehaves, the indirection through the event bus and registry can make stack traces less obvious. Good logging (which we included) mitigates this.
  • Type safety at boundaries: Plugin inputs and outputs are dict[str, Any], which sacrifices type safety for flexibility. For stricter typing, the context could use typed dataclasses or TypedDict.

When This Architecture Fits

This plugin architecture is appropriate when: - Third-party developers need to extend your application - The set of features is open-ended and evolving - Features are relatively independent (not deeply intertwined) - You want to deploy new features without redeploying the core

It is not appropriate when: - All features are known upfront and unlikely to change - The team is very small and co-located - Performance is critical and the indirection overhead matters - The "plugins" are actually tightly coupled components

This case study demonstrates how multiple patterns compose to create a system greater than the sum of its parts. Each pattern solves a specific problem; together, they create an architecture that is both robust and extensible.