Case Study 02: Building a Plugin System with Design Patterns
Overview
Context: A data analytics company builds a command-line tool called DataFlow that processes CSV and JSON files. The tool is popular, but users keep requesting support for new data sources (Excel, databases, APIs) and new output formats (charts, PDF reports, Slack notifications). Rather than building every feature in-house, the team decides to create a plugin architecture that lets third-party developers extend the tool.
Challenge: Design and implement an extensible plugin system using design patterns that allows third-party developers to add new functionality without modifying the core codebase.
Patterns Applied: Factory (plugin discovery and instantiation), Observer (event-based plugin communication), Protocol (plugin interfaces), and Facade (simplified plugin API).
Requirements
The plugin system must support:
- Discovery: Automatically find plugins in a designated directory.
- Lifecycle management: Plugins have
initialize,execute, andshutdownphases. - Event-driven communication: Plugins communicate through events, not direct references.
- Configuration: Each plugin can declare its configuration requirements.
- Isolation: A failing plugin should not crash the entire system.
Designing the Plugin Interface
We start by defining what a plugin looks like using Python's Protocol:
from typing import Protocol, Any, runtime_checkable
from dataclasses import dataclass, field
@dataclass(frozen=True)
class PluginMetadata:
"""Describes a plugin's identity and requirements."""
name: str
version: str
author: str
description: str
dependencies: list[str] = field(default_factory=list)
config_schema: dict[str, type] = field(default_factory=dict)
@runtime_checkable
class Plugin(Protocol):
"""The contract every plugin must satisfy."""
@property
def metadata(self) -> PluginMetadata: ...
def initialize(self, config: dict[str, Any]) -> None:
"""Called once when the plugin is loaded."""
...
def execute(self, context: dict[str, Any]) -> dict[str, Any]:
"""Called during the data processing pipeline."""
...
def shutdown(self) -> None:
"""Called when the plugin is being unloaded."""
...
Using @runtime_checkable allows us to verify at load time that a module actually provides a valid plugin. Using Protocol (structural subtyping) means plugin authors never need to import from our core library—they just implement the right methods and properties.
The Event Bus (Observer Pattern)
Plugins need to communicate without knowing about each other. An event bus implements the Observer pattern for decoupled, event-driven communication:
from typing import Callable, Any
from dataclasses import dataclass, field
from datetime import datetime
import logging
logger = logging.getLogger("dataflow.events")
@dataclass(frozen=True)
class Event:
"""Base for all events in the system."""
source: str
timestamp: datetime = field(default_factory=datetime.now)
@dataclass(frozen=True)
class DataLoadedEvent(Event):
"""Fired when a data source has been loaded."""
row_count: int = 0
columns: tuple[str, ...] = ()
@dataclass(frozen=True)
class DataTransformedEvent(Event):
"""Fired when data has been transformed."""
transformation: str = ""
rows_affected: int = 0
@dataclass(frozen=True)
class ErrorEvent(Event):
"""Fired when a plugin encounters an error."""
error_type: str = ""
message: str = ""
@dataclass(frozen=True)
class PipelineCompleteEvent(Event):
"""Fired when the entire pipeline finishes."""
total_rows: int = 0
duration_seconds: float = 0.0
EventHandler = Callable[[Event], None]
class EventBus:
"""Central event dispatcher using the Observer pattern.
Plugins subscribe to event types and receive notifications when
those events are emitted. Handlers are called in registration order.
"""
def __init__(self):
self._handlers: dict[type, list[EventHandler]] = {}
self._global_handlers: list[EventHandler] = []
def subscribe(self, event_type: type, handler: EventHandler) -> None:
"""Subscribe to a specific event type."""
self._handlers.setdefault(event_type, []).append(handler)
logger.debug(f"Handler registered for {event_type.__name__}")
def subscribe_all(self, handler: EventHandler) -> None:
"""Subscribe to all events."""
self._global_handlers.append(handler)
def unsubscribe(self, event_type: type, handler: EventHandler) -> None:
"""Remove a handler for a specific event type."""
if event_type in self._handlers:
self._handlers[event_type] = [
h for h in self._handlers[event_type] if h is not handler
]
def emit(self, event: Event) -> None:
"""Emit an event to all registered handlers.
Errors in handlers are logged but do not prevent other handlers
from receiving the event.
"""
event_type = type(event)
handlers = self._handlers.get(event_type, []) + self._global_handlers
for handler in handlers:
try:
handler(event)
except Exception as exc:
logger.error(
f"Handler {handler.__name__} failed for "
f"{event_type.__name__}: {exc}"
)
Key design decisions:
- Frozen dataclass events prevent handlers from modifying events, ensuring all observers receive the same data.
- Error isolation in
emit()means one broken handler does not prevent others from being notified. - Type-based subscription lets plugins subscribe to exactly the events they care about.
- Global handlers support cross-cutting concerns like logging and metrics.
The Plugin Registry (Factory Pattern)
The registry discovers, validates, and instantiates plugins. It acts as a Factory, mapping plugin names to concrete instances:
import importlib
import importlib.util
from pathlib import Path
from typing import Any
class PluginLoadError(Exception):
"""Raised when a plugin fails to load."""
pass
class PluginRegistry:
"""Discovers and manages plugin instances using the Factory pattern.
Plugins are loaded from Python modules in a designated directory.
Each module must contain a `create_plugin()` factory function that
returns a Plugin instance.
"""
def __init__(self):
self._plugins: dict[str, Plugin] = {}
self._metadata: dict[str, PluginMetadata] = {}
def discover(self, plugin_dir: str | Path) -> list[str]:
"""Scan a directory for plugin modules.
Each .py file in the directory is checked for a `create_plugin`
function. Valid plugins are registered but not yet initialized.
"""
plugin_dir = Path(plugin_dir)
discovered = []
if not plugin_dir.is_dir():
logger.warning(f"Plugin directory does not exist: {plugin_dir}")
return discovered
for path in sorted(plugin_dir.glob("*.py")):
if path.name.startswith("_"):
continue
try:
plugin = self._load_module(path)
self._plugins[plugin.metadata.name] = plugin
self._metadata[plugin.metadata.name] = plugin.metadata
discovered.append(plugin.metadata.name)
logger.info(f"Discovered plugin: {plugin.metadata.name} v{plugin.metadata.version}")
except PluginLoadError as e:
logger.error(f"Failed to load {path.name}: {e}")
return discovered
def _load_module(self, path: Path) -> Plugin:
"""Load a plugin from a Python module file."""
spec = importlib.util.spec_from_file_location(path.stem, path)
if spec is None or spec.loader is None:
raise PluginLoadError(f"Cannot create module spec for {path}")
module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(module)
except Exception as e:
raise PluginLoadError(f"Error executing module {path.name}: {e}")
factory = getattr(module, "create_plugin", None)
if factory is None:
raise PluginLoadError(
f"Module {path.name} does not have a create_plugin() function"
)
plugin = factory()
if not isinstance(plugin, Plugin):
raise PluginLoadError(
f"create_plugin() in {path.name} did not return a valid Plugin"
)
return plugin
def get(self, name: str) -> Plugin | None:
"""Retrieve a registered plugin by name."""
return self._plugins.get(name)
def get_all(self) -> list[Plugin]:
"""Return all registered plugins."""
return list(self._plugins.values())
def list_plugins(self) -> list[PluginMetadata]:
"""List metadata for all registered plugins."""
return list(self._metadata.values())
The convention-based approach (looking for create_plugin() in each module) is a lightweight Factory: the function name is the contract, and each module decides how to construct its plugin.
The Plugin Manager (Facade Pattern)
The PluginManager acts as a Facade, providing a simple interface that orchestrates the registry, event bus, and plugin lifecycle:
class PluginManager:
"""High-level facade for the plugin system.
Coordinates plugin discovery, initialization, execution, and shutdown.
Application code interacts with this class rather than the underlying
registry and event bus directly.
"""
def __init__(self, plugin_dir: str | Path):
self._plugin_dir = Path(plugin_dir)
self._registry = PluginRegistry()
self._event_bus = EventBus()
self._initialized: set[str] = set()
@property
def event_bus(self) -> EventBus:
"""Expose the event bus for application-level subscriptions."""
return self._event_bus
def discover_and_initialize(self, config: dict[str, dict[str, Any]] | None = None) -> list[str]:
"""Discover all plugins and initialize them with configuration.
Args:
config: A dictionary mapping plugin names to their config dicts.
Plugins not in this dict receive an empty config.
Returns:
List of successfully initialized plugin names.
"""
config = config or {}
discovered = self._registry.discover(self._plugin_dir)
initialized = []
for name in discovered:
plugin = self._registry.get(name)
if plugin is None:
continue
try:
plugin_config = config.get(name, {})
plugin.initialize(plugin_config)
self._initialized.add(name)
initialized.append(name)
logger.info(f"Initialized plugin: {name}")
except Exception as e:
logger.error(f"Failed to initialize plugin {name}: {e}")
self._event_bus.emit(ErrorEvent(
source=name,
error_type="InitializationError",
message=str(e),
))
return initialized
def execute_pipeline(self, context: dict[str, Any]) -> dict[str, Any]:
"""Run all initialized plugins in sequence.
Each plugin receives the context and can modify it. The updated
context is passed to the next plugin.
"""
results = {}
for name in self._initialized:
plugin = self._registry.get(name)
if plugin is None:
continue
try:
result = plugin.execute(context)
results[name] = result
context.update(result) # Pipeline: each plugin enriches context
except Exception as e:
logger.error(f"Plugin {name} failed during execution: {e}")
self._event_bus.emit(ErrorEvent(
source=name,
error_type="ExecutionError",
message=str(e),
))
results[name] = {"error": str(e)}
return results
def shutdown_all(self) -> None:
"""Gracefully shut down all initialized plugins."""
for name in list(self._initialized):
plugin = self._registry.get(name)
if plugin is None:
continue
try:
plugin.shutdown()
self._initialized.discard(name)
logger.info(f"Shut down plugin: {name}")
except Exception as e:
logger.error(f"Error shutting down plugin {name}: {e}")
def list_plugins(self) -> list[dict[str, str]]:
"""Return a summary of all discovered plugins."""
return [
{
"name": m.name,
"version": m.version,
"author": m.author,
"description": m.description,
"status": "initialized" if m.name in self._initialized else "discovered",
}
for m in self._registry.list_plugins()
]
Example Plugins
CSV Loader Plugin
# plugins/csv_loader.py
import csv
from dataclasses import dataclass, field
from typing import Any
@dataclass
class CSVLoaderPlugin:
"""Plugin that loads data from CSV files."""
_metadata: object = field(init=False)
_delimiter: str = ","
_encoding: str = "utf-8"
def __post_init__(self):
from plugin_types import PluginMetadata
self._metadata = PluginMetadata(
name="csv_loader",
version="1.0.0",
author="DataFlow Team",
description="Loads data from CSV files into the pipeline.",
config_schema={"delimiter": str, "encoding": str},
)
@property
def metadata(self):
return self._metadata
def initialize(self, config: dict[str, Any]) -> None:
self._delimiter = config.get("delimiter", ",")
self._encoding = config.get("encoding", "utf-8")
def execute(self, context: dict[str, Any]) -> dict[str, Any]:
filepath = context.get("input_file", "")
if not filepath.endswith(".csv"):
return {}
with open(filepath, encoding=self._encoding) as f:
reader = csv.DictReader(f, delimiter=self._delimiter)
data = list(reader)
return {"data": data, "row_count": len(data), "source": "csv"}
def shutdown(self) -> None:
pass
def create_plugin():
return CSVLoaderPlugin()
Statistics Plugin
# plugins/statistics.py
from dataclasses import dataclass, field
from typing import Any
@dataclass
class StatisticsPlugin:
"""Plugin that computes basic statistics on numeric columns."""
_metadata: object = field(init=False)
_columns: list[str] = field(default_factory=list)
def __post_init__(self):
from plugin_types import PluginMetadata
self._metadata = PluginMetadata(
name="statistics",
version="1.0.0",
author="DataFlow Team",
description="Computes mean, min, max for numeric columns.",
)
@property
def metadata(self):
return self._metadata
def initialize(self, config: dict[str, Any]) -> None:
self._columns = config.get("columns", [])
def execute(self, context: dict[str, Any]) -> dict[str, Any]:
data = context.get("data", [])
if not data:
return {"statistics": {}}
columns = self._columns or self._detect_numeric_columns(data)
stats = {}
for col in columns:
values = []
for row in data:
try:
values.append(float(row[col]))
except (KeyError, ValueError, TypeError):
continue
if values:
stats[col] = {
"count": len(values),
"mean": sum(values) / len(values),
"min": min(values),
"max": max(values),
}
return {"statistics": stats}
def _detect_numeric_columns(self, data: list[dict]) -> list[str]:
if not data:
return []
numeric = []
for key, value in data[0].items():
try:
float(value)
numeric.append(key)
except (ValueError, TypeError):
pass
return numeric
def shutdown(self) -> None:
pass
def create_plugin():
return StatisticsPlugin()
Notification Plugin (Using the Event Bus)
# plugins/slack_notifier.py
from dataclasses import dataclass, field
from typing import Any
@dataclass
class SlackNotifierPlugin:
"""Plugin that sends Slack notifications on pipeline events."""
_metadata: object = field(init=False)
_webhook_url: str = ""
_channel: str = "#data-alerts"
def __post_init__(self):
from plugin_types import PluginMetadata
self._metadata = PluginMetadata(
name="slack_notifier",
version="1.0.0",
author="DataFlow Team",
description="Sends pipeline notifications to Slack.",
config_schema={"webhook_url": str, "channel": str},
)
@property
def metadata(self):
return self._metadata
def initialize(self, config: dict[str, Any]) -> None:
self._webhook_url = config.get("webhook_url", "")
self._channel = config.get("channel", "#data-alerts")
if not self._webhook_url:
raise ValueError("slack_notifier requires 'webhook_url' in config")
def execute(self, context: dict[str, Any]) -> dict[str, Any]:
row_count = context.get("row_count", 0)
source = context.get("source", "unknown")
message = f"Pipeline processed {row_count} rows from {source}"
self._send_slack_message(message)
return {"notification_sent": True}
def _send_slack_message(self, message: str) -> None:
"""Send a message to Slack via webhook."""
import json
import urllib.request
payload = json.dumps({
"channel": self._channel,
"text": message,
}).encode("utf-8")
req = urllib.request.Request(
self._webhook_url,
data=payload,
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req) as response:
if response.status != 200:
print(f"Slack notification failed: {response.status}")
except Exception as e:
print(f"Slack notification error: {e}")
def shutdown(self) -> None:
pass
def create_plugin():
return SlackNotifierPlugin()
Putting It All Together
The application entry point uses the PluginManager facade:
def main():
manager = PluginManager(plugin_dir="./plugins")
# Subscribe to error events at the application level
manager.event_bus.subscribe_all(
lambda event: print(f"[EVENT] {type(event).__name__}: {event}")
)
# Discover and initialize with per-plugin configuration
config = {
"csv_loader": {"delimiter": ",", "encoding": "utf-8"},
"statistics": {"columns": ["revenue", "units_sold"]},
"slack_notifier": {"webhook_url": "https://hooks.slack.com/..."},
}
initialized = manager.discover_and_initialize(config)
print(f"Initialized plugins: {initialized}")
# Run the pipeline
context = {"input_file": "sales_data.csv"}
results = manager.execute_pipeline(context)
for plugin_name, result in results.items():
print(f" {plugin_name}: {result}")
# Clean shutdown
manager.shutdown_all()
Analysis: How the Patterns Interact
The four patterns in this system play distinct, complementary roles:
-
Protocol defines the contract. Plugin authors know exactly what methods to implement. The core system validates conformance at load time using
isinstance()with@runtime_checkable. -
Factory (the registry's
discoverand_load_modulemethods) handles the messy business of finding, loading, and instantiating plugins. Application code never callsimportlibdirectly. -
Observer (the event bus) provides the communication backbone. Plugins can react to system events and each other's actions without importing each other. The Slack notifier does not know about the CSV loader—it just reacts to pipeline events.
-
Facade (the
PluginManager) hides the orchestration complexity. Application code callsdiscover_and_initialize()andexecute_pipeline()without worrying about load order, error handling, or lifecycle management.
Extensibility in Practice
When a user wants to add Excel support, they:
- Create
plugins/excel_loader.py - Implement the
Pluginprotocol - Define a
create_plugin()function - Drop the file in the plugins directory
No core code changes. No recompilation. No coordination with the DataFlow team. This is the power of a well-designed plugin architecture.
Trade-offs
- Complexity: The plugin system itself is approximately 300 lines. For an application with only 2–3 fixed features, this is overkill.
- Debugging: When a plugin misbehaves, the indirection through the event bus and registry can make stack traces less obvious. Good logging (which we included) mitigates this.
- Type safety at boundaries: Plugin inputs and outputs are
dict[str, Any], which sacrifices type safety for flexibility. For stricter typing, thecontextcould use typed dataclasses or TypedDict.
When This Architecture Fits
This plugin architecture is appropriate when: - Third-party developers need to extend your application - The set of features is open-ended and evolving - Features are relatively independent (not deeply intertwined) - You want to deploy new features without redeploying the core
It is not appropriate when: - All features are known upfront and unlikely to change - The team is very small and co-located - Performance is critical and the indirection overhead matters - The "plugins" are actually tightly coupled components
This case study demonstrates how multiple patterns compose to create a system greater than the sum of its parts. Each pattern solves a specific problem; together, they create an architecture that is both robust and extensible.