16 min read

No application is an island. In Chapter 17, we built REST APIs that expose functionality to the world. Now we flip the perspective: your application is the consumer, reaching out to external services that provide payments, email delivery, cloud...

Chapter 20: Working with External APIs and Integrations

Learning Objectives

By the end of this chapter, you will be able to:

  1. Recall the major categories of external APIs and common integration patterns used in modern software development (Knowledge)
  2. Explain how OAuth 2.0 authentication flows work and when to use each grant type (Comprehension)
  3. Apply retry strategies, circuit breaker patterns, and rate-limit handling when consuming external APIs (Application)
  4. Analyze API documentation to determine optimal integration strategies with AI assistance (Analysis)
  5. Evaluate trade-offs between different integration approaches, including direct API calls, SDKs, and abstraction layers (Evaluation)
  6. Create a production-ready application that integrates multiple external services with proper error handling and resilience (Synthesis)

Introduction

No application is an island. In Chapter 17, we built REST APIs that expose functionality to the world. Now we flip the perspective: your application is the consumer, reaching out to external services that provide payments, email delivery, cloud storage, geolocation, weather data, and hundreds of other capabilities.

Integration work is one of the areas where vibe coding truly shines. External APIs come with lengthy documentation, complex authentication flows, and subtle edge cases. AI coding assistants excel at navigating this complexity -- they have been trained on thousands of API integration examples and can generate boilerplate code that would otherwise take hours of documentation reading.

This chapter teaches you to build robust, production-ready integrations. We will cover the full spectrum: consuming REST APIs, handling authentication with OAuth 2.0, processing payments, sending notifications, storing files in the cloud, working with third-party data services, receiving webhooks, and handling the inevitable errors and rate limits that come with depending on external systems.

Vibe Check: When you ask an AI assistant to "integrate with the Stripe API," it can produce working code in seconds. But understanding why the code is structured a certain way -- retry logic, idempotency keys, webhook verification -- is what separates hobby projects from production systems. This chapter gives you that understanding.


20.1 The Integration Landscape

Modern applications rarely exist in isolation. A typical SaaS product might integrate with a dozen or more external services:

Category Examples Purpose
Payments Stripe, PayPal, Square Process transactions
Email SendGrid, Mailgun, Amazon SES Deliver transactional and marketing email
Cloud Storage AWS S3, Google Cloud Storage, Azure Blob Store and serve files
Authentication Auth0, Okta, Firebase Auth User identity management
Communication Twilio, Vonage, Slack SMS, voice, chat
Analytics Segment, Mixpanel, Amplitude User behavior tracking
Maps & Location Google Maps, Mapbox, HERE Geocoding, directions, maps
Weather OpenWeatherMap, WeatherAPI Weather data and forecasts
Search Algolia, Elasticsearch, Meilisearch Full-text search
AI/ML OpenAI, Anthropic, Hugging Face AI model inference

Integration Patterns

There are several fundamental patterns for integrating with external services:

Direct API Calls. Your application makes HTTP requests directly to the external service's REST (or GraphQL) endpoints. This is the most common pattern and offers maximum control.

Official SDKs. Many services provide language-specific libraries that wrap their API. SDKs can simplify integration but add dependencies and sometimes lag behind API updates.

Abstraction Layers. You build an internal interface that hides the specific external service behind a contract. This lets you swap providers without changing application code.

Webhooks. Instead of polling an external service, you register a URL that the service calls when events occur. This is an inversion of the typical request-response pattern.

Message Queues. For asynchronous integrations, you might place API calls on a queue (RabbitMQ, Redis, SQS) and process them with background workers.

┌─────────────┐     HTTP Request      ┌──────────────┐
│             │ ──────────────────────>│              │
│  Your App   │                        │ External API │
│             │ <──────────────────────│              │
└─────────────┘     HTTP Response      └──────────────┘

┌─────────────┐     Register URL       ┌──────────────┐
│             │ ──────────────────────>│              │
│  Your App   │                        │ External API │
│             │ <──────────────────────│              │
└─────────────┘   Webhook Callback     └──────────────┘

How AI Helps with API Integration

API documentation can be overwhelming. A single service like Stripe has hundreds of endpoints, each with dozens of parameters. AI coding assistants help in several ways:

  1. Boilerplate generation. "Create a Python client for the OpenWeatherMap API with error handling" produces a working starting point in seconds.
  2. Documentation navigation. "What parameters does the Stripe PaymentIntent create endpoint accept?" gets you answers without searching through docs.
  3. Pattern application. "Add exponential backoff retry logic to this API call" applies a well-known pattern correctly.
  4. Code translation. "Convert this curl example from the API docs to Python requests" translates between formats instantly.

AI Prompt Tip: When asking an AI assistant to integrate with an API, always specify: (1) which library to use, (2) how to handle errors, (3) whether you need async support, and (4) what authentication method the API requires. For example: "Create a SendGrid email client using httpx with async support, API key authentication, and retry logic for transient failures."


20.2 RESTful API Consumption

In Chapter 17, we built REST APIs with FastAPI. Now we are on the other side -- consuming them. Python offers two primary HTTP libraries for this work:

  • requests -- The classic synchronous HTTP library. Simple, well-documented, widely used.
  • httpx -- A modern library that supports both sync and async requests, HTTP/2, and has a requests-compatible API.

For new projects, httpx is generally the better choice because it supports async operations natively, which is critical when your application makes multiple API calls concurrently.

Building a Robust API Client

A production API client needs more than just requests.get(). Here is the anatomy of a well-structured client:

import httpx
from typing import Any


class WeatherAPIClient:
    """Client for consuming the OpenWeatherMap API."""

    BASE_URL = "https://api.openweathermap.org/data/2.5"

    def __init__(self, api_key: str, timeout: float = 10.0) -> None:
        self.api_key = api_key
        self.client = httpx.Client(
            base_url=self.BASE_URL,
            timeout=timeout,
            headers={"Accept": "application/json"},
        )

    def get_current_weather(self, city: str) -> dict[str, Any]:
        """Fetch current weather for a city."""
        response = self.client.get(
            "/weather",
            params={"q": city, "appid": self.api_key, "units": "metric"},
        )
        response.raise_for_status()
        return response.json()

    def close(self) -> None:
        """Close the underlying HTTP client."""
        self.client.close()

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()

This basic client is a good start, but production systems need several additional capabilities that we will build throughout this chapter: retry logic, rate-limit handling, response caching, and circuit breakers.

Async API Consumption

When your application calls multiple external APIs, doing so sequentially wastes time. Async HTTP calls let you fire off multiple requests concurrently:

import asyncio
import httpx


async def fetch_multiple_apis() -> dict[str, dict]:
    """Fetch data from multiple APIs concurrently."""
    async with httpx.AsyncClient() as client:
        weather_task = client.get(
            "https://api.openweathermap.org/data/2.5/weather",
            params={"q": "London", "appid": "YOUR_KEY"},
        )
        news_task = client.get(
            "https://newsapi.org/v2/top-headlines",
            params={"country": "us", "apiKey": "YOUR_KEY"},
        )

        weather_resp, news_resp = await asyncio.gather(
            weather_task, news_task
        )

        return {
            "weather": weather_resp.json(),
            "news": news_resp.json(),
        }

With asyncio.gather(), both API calls execute concurrently. If each takes 500ms, the total time is roughly 500ms instead of 1000ms. For a dashboard that aggregates data from five services, the difference is dramatic.

Cross-Reference: See Chapter 17 for building the server side of REST APIs with FastAPI. The async patterns here complement the async endpoint handlers you learned there.

Request and Response Handling Best Practices

Always set timeouts. An external API that hangs can block your entire application. Set both connect and read timeouts:

client = httpx.Client(
    timeout=httpx.Timeout(
        connect=5.0,    # Time to establish connection
        read=10.0,      # Time to receive response
        write=5.0,      # Time to send request body
        pool=5.0,       # Time to acquire connection from pool
    )
)

Validate response data. Never trust external API responses blindly. Use Pydantic models to validate:

from pydantic import BaseModel


class WeatherResponse(BaseModel):
    temp: float
    humidity: int
    description: str

    @classmethod
    def from_api_response(cls, data: dict) -> "WeatherResponse":
        main = data.get("main", {})
        weather = data.get("weather", [{}])[0]
        return cls(
            temp=main.get("temp", 0.0),
            humidity=main.get("humidity", 0),
            description=weather.get("description", "unknown"),
        )

Log requests and responses. When debugging integration issues, you need visibility into what was sent and received:

import logging

logger = logging.getLogger(__name__)


def log_request(request: httpx.Request) -> None:
    logger.info(f"Request: {request.method} {request.url}")


def log_response(response: httpx.Response) -> None:
    logger.info(f"Response: {response.status_code} from {response.url}")


client = httpx.Client(
    event_hooks={
        "request": [log_request],
        "response": [log_response],
    }
)

20.3 OAuth and API Authentication

Most external APIs require authentication. The method varies by service, but falls into a few common patterns:

API Key Authentication

The simplest approach. You pass a key in a header or query parameter:

# Header-based API key
headers = {"Authorization": "Bearer sk-your-api-key"}

# Query parameter API key
params = {"apiKey": "your-api-key"}

# Custom header
headers = {"X-API-Key": "your-api-key"}

API keys are suitable for server-to-server communication but should never be exposed in client-side code.

OAuth 2.0 Fundamentals

OAuth 2.0 is the industry standard for delegated authorization. It allows your application to access a user's resources on another service without knowing the user's password.

There are several OAuth 2.0 grant types, each suited to different scenarios:

Grant Type Use Case Flow
Authorization Code Web apps with server backend User redirects to auth server, gets code, server exchanges for token
Authorization Code + PKCE Single-page apps, mobile apps Same as above with additional security layer
Client Credentials Server-to-server, no user context App authenticates directly with client ID and secret
Device Code CLI tools, smart TVs User enters code on separate device

Authorization Code Flow

This is the most common OAuth flow for web applications. Here is how it works:

┌────────┐    1. Redirect to Auth Server    ┌────────────┐
│        │ ─────────────────────────────────>│            │
│  User  │    2. User Logs In & Consents     │   Auth     │
│Browser │ <─────────────────────────────────│   Server   │
│        │    3. Redirect with Auth Code     │            │
└────────┘                                   └────────────┘
     │                                             │
     │  4. Send Auth Code                          │
     ▼                                             │
┌────────┐    5. Exchange Code for Token     ┌────────────┐
│  Your  │ ─────────────────────────────────>│   Auth     │
│  App   │    6. Receive Access Token        │   Server   │
│Server  │ <─────────────────────────────────│            │
└────────┘                                   └────────────┘
     │
     │  7. Use Access Token
     ▼
┌────────────┐
│  External  │
│    API     │
└────────────┘

The implementation involves several steps:

import httpx
import secrets
from urllib.parse import urlencode


class OAuthClient:
    """OAuth 2.0 Authorization Code flow client."""

    def __init__(
        self,
        client_id: str,
        client_secret: str,
        redirect_uri: str,
        auth_url: str,
        token_url: str,
    ) -> None:
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.auth_url = auth_url
        self.token_url = token_url

    def get_authorization_url(self, scopes: list[str]) -> tuple[str, str]:
        """Generate the URL to redirect users for authorization."""
        state = secrets.token_urlsafe(32)
        params = {
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "response_type": "code",
            "scope": " ".join(scopes),
            "state": state,
        }
        url = f"{self.auth_url}?{urlencode(params)}"
        return url, state

    async def exchange_code_for_token(
        self, code: str
    ) -> dict[str, str]:
        """Exchange authorization code for access token."""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.token_url,
                data={
                    "grant_type": "authorization_code",
                    "code": code,
                    "redirect_uri": self.redirect_uri,
                    "client_id": self.client_id,
                    "client_secret": self.client_secret,
                },
            )
            response.raise_for_status()
            return response.json()

Client Credentials Flow

For server-to-server communication where no user is involved:

async def get_client_credentials_token(
    token_url: str,
    client_id: str,
    client_secret: str,
    scopes: list[str],
) -> dict[str, str]:
    """Obtain token using client credentials grant."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            token_url,
            data={
                "grant_type": "client_credentials",
                "scope": " ".join(scopes),
            },
            auth=(client_id, client_secret),
        )
        response.raise_for_status()
        return response.json()

Token Management

Access tokens expire. A production application must handle token refresh:

import time


class TokenManager:
    """Manages OAuth tokens with automatic refresh."""

    def __init__(
        self,
        token_url: str,
        client_id: str,
        client_secret: str,
    ) -> None:
        self.token_url = token_url
        self.client_id = client_id
        self.client_secret = client_secret
        self._access_token: str | None = None
        self._refresh_token: str | None = None
        self._expires_at: float = 0.0

    @property
    def is_expired(self) -> bool:
        """Check if current token is expired (with 60s buffer)."""
        return time.time() >= (self._expires_at - 60)

    async def get_valid_token(self) -> str:
        """Return a valid access token, refreshing if needed."""
        if self._access_token and not self.is_expired:
            return self._access_token

        if self._refresh_token:
            await self._refresh()
        else:
            raise RuntimeError("No valid token and no refresh token")

        return self._access_token

    async def _refresh(self) -> None:
        """Refresh the access token."""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.token_url,
                data={
                    "grant_type": "refresh_token",
                    "refresh_token": self._refresh_token,
                    "client_id": self.client_id,
                    "client_secret": self.client_secret,
                },
            )
            response.raise_for_status()
            data = response.json()
            self._access_token = data["access_token"]
            self._refresh_token = data.get(
                "refresh_token", self._refresh_token
            )
            self._expires_at = time.time() + data.get(
                "expires_in", 3600
            )

Security Warning: Never store OAuth tokens in client-side code, local storage, or version control. Use secure server-side sessions, encrypted cookies, or dedicated secret management services. Environment variables are acceptable for API keys in development but use a secrets manager (AWS Secrets Manager, HashiCorp Vault) in production.


20.4 Payment Processing Integration

Payment integration is one of the most critical -- and most sensitive -- integrations your application can have. Mistakes can cost real money, both for you and your users.

Payment Integration Principles

Never handle raw card numbers. Modern payment processors use tokenization. The user's card details go directly to the payment processor (usually via a client-side widget), and your server only receives a token representing the payment method.

Idempotency is essential. Network failures happen. If a payment request times out, you need to safely retry without charging the customer twice. Use idempotency keys:

import uuid
import httpx


async def create_payment(
    amount: int,
    currency: str,
    payment_method_token: str,
    idempotency_key: str | None = None,
) -> dict:
    """Create a payment with idempotency protection."""
    if idempotency_key is None:
        idempotency_key = str(uuid.uuid4())

    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://api.paymentprocessor.com/v1/payments",
            json={
                "amount": amount,
                "currency": currency,
                "payment_method": payment_method_token,
            },
            headers={
                "Authorization": "Bearer sk_live_your_key",
                "Idempotency-Key": idempotency_key,
            },
        )
        response.raise_for_status()
        return response.json()

Use webhooks for payment status. Do not rely solely on the synchronous API response. Payments can be asynchronous (bank transfers, 3D Secure verification). Always set up webhooks to receive payment status updates.

A Complete Payment Flow

Here is a typical payment integration architecture:

┌──────────┐   1. Card Details    ┌──────────────┐
│  Client  │ ────────────────────>│   Payment    │
│  (React) │   2. Payment Token   │  Processor   │
│          │ <────────────────────│   (Stripe)   │
└──────────┘                      └──────────────┘
     │                                   │
     │ 3. Token + Order Details          │
     ▼                                   │
┌──────────┐   4. Create Payment  ┌──────────────┐
│   Your   │ ────────────────────>│   Payment    │
│  Server  │   5. Payment Status  │  Processor   │
│          │ <────────────────────│              │
└──────────┘                      └──────────────┘
     │                                   │
     │                                   │
     │ <──── 6. Webhook: Payment ────────┘
     │        Succeeded/Failed
     ▼
┌──────────┐
│ Database │
│ (Update  │
│  Order)  │
└──────────┘

Implementing a Payment Service

from dataclasses import dataclass
from enum import Enum


class PaymentStatus(Enum):
    PENDING = "pending"
    SUCCEEDED = "succeeded"
    FAILED = "failed"
    REFUNDED = "refunded"


@dataclass
class PaymentResult:
    payment_id: str
    status: PaymentStatus
    amount: int
    currency: str
    error_message: str | None = None


class PaymentService:
    """Service for processing payments via external processor."""

    def __init__(self, api_key: str, base_url: str) -> None:
        self.api_key = api_key
        self.base_url = base_url
        self.client = httpx.AsyncClient(
            base_url=base_url,
            headers={"Authorization": f"Bearer {api_key}"},
            timeout=30.0,
        )

    async def create_payment_intent(
        self,
        amount: int,
        currency: str,
        customer_id: str,
        metadata: dict | None = None,
    ) -> PaymentResult:
        """Create a payment intent for the given amount."""
        idempotency_key = str(uuid.uuid4())
        try:
            response = await self.client.post(
                "/v1/payment_intents",
                json={
                    "amount": amount,
                    "currency": currency,
                    "customer": customer_id,
                    "metadata": metadata or {},
                },
                headers={"Idempotency-Key": idempotency_key},
            )
            response.raise_for_status()
            data = response.json()
            return PaymentResult(
                payment_id=data["id"],
                status=PaymentStatus(data["status"]),
                amount=data["amount"],
                currency=data["currency"],
            )
        except httpx.HTTPStatusError as exc:
            error_data = exc.response.json()
            return PaymentResult(
                payment_id="",
                status=PaymentStatus.FAILED,
                amount=amount,
                currency=currency,
                error_message=error_data.get("error", {}).get(
                    "message", "Unknown error"
                ),
            )

    async def refund_payment(
        self, payment_id: str, amount: int | None = None
    ) -> PaymentResult:
        """Refund a payment, optionally partially."""
        payload: dict = {"payment_intent": payment_id}
        if amount is not None:
            payload["amount"] = amount

        response = await self.client.post(
            "/v1/refunds",
            json=payload,
        )
        response.raise_for_status()
        data = response.json()
        return PaymentResult(
            payment_id=data["id"],
            status=PaymentStatus.REFUNDED,
            amount=data["amount"],
            currency=data["currency"],
        )

    async def close(self) -> None:
        await self.client.aclose()

Best Practice: Always store the external payment processor's payment ID alongside your internal order ID. When discrepancies arise (and they will), you need to be able to cross-reference between your system and the payment processor's dashboard.


20.5 Email and Notification Services

Sending email from your application requires an external email delivery service. Trying to run your own mail server is a path to madness -- deliverability, spam filtering, bounce handling, and reputation management are full-time jobs.

Email Service Integration

Here is a pattern for integrating with a transactional email service:

from dataclasses import dataclass, field


@dataclass
class EmailMessage:
    to: list[str]
    subject: str
    html_body: str
    text_body: str | None = None
    from_email: str = "noreply@yourapp.com"
    from_name: str = "Your App"
    reply_to: str | None = None
    cc: list[str] = field(default_factory=list)
    bcc: list[str] = field(default_factory=list)
    attachments: list[dict] = field(default_factory=list)


class EmailService:
    """Service for sending transactional emails."""

    def __init__(self, api_key: str) -> None:
        self.api_key = api_key
        self.base_url = "https://api.emailservice.com/v3"
        self.client = httpx.AsyncClient(
            base_url=self.base_url,
            headers={
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json",
            },
            timeout=15.0,
        )

    async def send(self, message: EmailMessage) -> dict:
        """Send a single email message."""
        payload = {
            "personalizations": [
                {
                    "to": [{"email": addr} for addr in message.to],
                    "cc": [{"email": addr} for addr in message.cc],
                    "bcc": [{"email": addr} for addr in message.bcc],
                }
            ],
            "from": {
                "email": message.from_email,
                "name": message.from_name,
            },
            "subject": message.subject,
            "content": [
                {"type": "text/html", "value": message.html_body},
            ],
        }

        if message.text_body:
            payload["content"].insert(
                0,
                {"type": "text/plain", "value": message.text_body},
            )

        if message.reply_to:
            payload["reply_to"] = {"email": message.reply_to}

        response = await self.client.post("/mail/send", json=payload)
        response.raise_for_status()
        return {"status": "sent", "status_code": response.status_code}

    async def send_template(
        self,
        template_id: str,
        to: list[str],
        dynamic_data: dict,
    ) -> dict:
        """Send email using a pre-defined template."""
        payload = {
            "personalizations": [
                {
                    "to": [{"email": addr} for addr in to],
                    "dynamic_template_data": dynamic_data,
                }
            ],
            "from": {
                "email": "noreply@yourapp.com",
                "name": "Your App",
            },
            "template_id": template_id,
        }
        response = await self.client.post("/mail/send", json=payload)
        response.raise_for_status()
        return {"status": "sent", "status_code": response.status_code}

    async def close(self) -> None:
        await self.client.aclose()

Notification Patterns

Email is just one notification channel. A robust notification system supports multiple channels through a unified interface:

from abc import ABC, abstractmethod


class NotificationChannel(ABC):
    """Abstract base class for notification channels."""

    @abstractmethod
    async def send(
        self, recipient: str, subject: str, body: str
    ) -> bool:
        """Send a notification. Returns True if successful."""
        ...


class EmailChannel(NotificationChannel):
    async def send(
        self, recipient: str, subject: str, body: str
    ) -> bool:
        # Uses EmailService internally
        ...


class SMSChannel(NotificationChannel):
    async def send(
        self, recipient: str, subject: str, body: str
    ) -> bool:
        # Uses Twilio or similar
        ...


class SlackChannel(NotificationChannel):
    async def send(
        self, recipient: str, subject: str, body: str
    ) -> bool:
        # Uses Slack Webhook API
        ...


class NotificationService:
    """Unified notification service supporting multiple channels."""

    def __init__(self) -> None:
        self.channels: dict[str, NotificationChannel] = {}

    def register_channel(
        self, name: str, channel: NotificationChannel
    ) -> None:
        self.channels[name] = channel

    async def notify(
        self,
        channel_name: str,
        recipient: str,
        subject: str,
        body: str,
    ) -> bool:
        channel = self.channels.get(channel_name)
        if not channel:
            raise ValueError(f"Unknown channel: {channel_name}")
        return await channel.send(recipient, subject, body)

    async def notify_all(
        self,
        recipient_map: dict[str, str],
        subject: str,
        body: str,
    ) -> dict[str, bool]:
        """Send notification to multiple channels."""
        results = {}
        for channel_name, recipient in recipient_map.items():
            try:
                results[channel_name] = await self.notify(
                    channel_name, recipient, subject, body
                )
            except Exception:
                results[channel_name] = False
        return results

Design Pattern: The abstraction layer pattern shown here is called the Strategy Pattern. By defining a common interface (NotificationChannel) and swapping implementations, you can add new channels (push notifications, in-app messages) without modifying the core notification logic.


20.6 Cloud Storage Integration

Cloud object storage (S3, GCS, Azure Blob) is the standard way to store user uploads, generated files, backups, and static assets. The API patterns are similar across providers.

S3-Style Storage Client

from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
import hmac


@dataclass
class StorageObject:
    key: str
    size: int
    content_type: str
    last_modified: datetime
    etag: str


class CloudStorageClient:
    """Client for S3-compatible cloud storage."""

    def __init__(
        self,
        access_key: str,
        secret_key: str,
        bucket: str,
        region: str = "us-east-1",
        endpoint: str | None = None,
    ) -> None:
        self.access_key = access_key
        self.secret_key = secret_key
        self.bucket = bucket
        self.region = region
        self.endpoint = (
            endpoint
            or f"https://{bucket}.s3.{region}.amazonaws.com"
        )
        self.client = httpx.AsyncClient(timeout=60.0)

    async def upload_file(
        self,
        key: str,
        data: bytes,
        content_type: str = "application/octet-stream",
    ) -> StorageObject:
        """Upload a file to cloud storage."""
        response = await self.client.put(
            f"{self.endpoint}/{key}",
            content=data,
            headers={
                "Content-Type": content_type,
                "Content-Length": str(len(data)),
            },
        )
        response.raise_for_status()
        return StorageObject(
            key=key,
            size=len(data),
            content_type=content_type,
            last_modified=datetime.utcnow(),
            etag=response.headers.get("ETag", ""),
        )

    async def download_file(self, key: str) -> bytes:
        """Download a file from cloud storage."""
        response = await self.client.get(
            f"{self.endpoint}/{key}"
        )
        response.raise_for_status()
        return response.content

    async def delete_file(self, key: str) -> bool:
        """Delete a file from cloud storage."""
        response = await self.client.delete(
            f"{self.endpoint}/{key}"
        )
        return response.status_code == 204

    async def generate_presigned_url(
        self,
        key: str,
        expires_in: int = 3600,
    ) -> str:
        """Generate a pre-signed URL for temporary access."""
        expiration = datetime.utcnow() + timedelta(seconds=expires_in)
        # In production, use proper AWS Signature V4 signing
        return (
            f"{self.endpoint}/{key}"
            f"?expires={int(expiration.timestamp())}"
        )

    async def list_objects(
        self, prefix: str = "", max_keys: int = 1000
    ) -> list[StorageObject]:
        """List objects in the bucket with optional prefix filter."""
        response = await self.client.get(
            self.endpoint,
            params={
                "list-type": "2",
                "prefix": prefix,
                "max-keys": max_keys,
            },
        )
        response.raise_for_status()
        # Parse XML response (simplified)
        return []

    async def close(self) -> None:
        await self.client.aclose()

Pre-signed URLs

A critical pattern for cloud storage is pre-signed URLs. Instead of routing file uploads and downloads through your server, you generate a temporary URL that lets the client interact directly with the storage service:

┌──────────┐  1. Request upload URL  ┌──────────┐
│  Client  │ ───────────────────────>│  Your    │
│          │  2. Pre-signed URL      │  Server  │
│          │ <───────────────────────│          │
└──────────┘                         └──────────┘
     │
     │  3. Upload directly
     ▼
┌──────────────┐
│    Cloud     │
│   Storage    │
└──────────────┘

This pattern reduces your server's bandwidth and processing load, especially for large files.

AI Prompt Tip: When asking an AI assistant to generate cloud storage integration code, specify whether you want to use the provider's official SDK (e.g., boto3 for AWS) or raw HTTP requests. SDKs handle authentication signing automatically, which is significantly easier. For example: "Create a file upload service using boto3 that supports multipart uploads for files over 100MB, generates pre-signed download URLs, and organizes files by user ID."


20.7 Third-Party Data Services

Many applications enrich their functionality with data from external services: weather, maps, currency exchange rates, stock prices, and more.

Working with Data APIs

Here is a pattern for consuming a third-party data API with caching:

import time
from typing import Any


class CachedAPIClient:
    """API client with in-memory response caching."""

    def __init__(
        self,
        base_url: str,
        api_key: str,
        cache_ttl: int = 300,
    ) -> None:
        self.base_url = base_url
        self.api_key = api_key
        self.cache_ttl = cache_ttl
        self._cache: dict[str, tuple[float, Any]] = {}
        self.client = httpx.AsyncClient(
            base_url=base_url,
            headers={"Authorization": f"Bearer {api_key}"},
            timeout=10.0,
        )

    def _cache_key(self, path: str, params: dict) -> str:
        """Generate a cache key from request parameters."""
        sorted_params = sorted(params.items())
        return f"{path}:{sorted_params}"

    def _get_cached(self, key: str) -> Any | None:
        """Return cached value if it exists and is not expired."""
        if key in self._cache:
            timestamp, data = self._cache[key]
            if time.time() - timestamp < self.cache_ttl:
                return data
            del self._cache[key]
        return None

    async def get(
        self, path: str, params: dict | None = None
    ) -> Any:
        """Make a GET request with caching."""
        params = params or {}
        cache_key = self._cache_key(path, params)

        cached = self._get_cached(cache_key)
        if cached is not None:
            return cached

        response = await self.client.get(path, params=params)
        response.raise_for_status()
        data = response.json()

        self._cache[cache_key] = (time.time(), data)
        return data

    async def close(self) -> None:
        await self.client.aclose()

Geocoding Example

class GeocodingService:
    """Service for converting addresses to coordinates."""

    def __init__(self, api_key: str) -> None:
        self.client = CachedAPIClient(
            base_url="https://api.geocoding.com/v1",
            api_key=api_key,
            cache_ttl=86400,  # Cache geocoding results for 24 hours
        )

    async def geocode(self, address: str) -> dict[str, float]:
        """Convert an address to latitude/longitude."""
        data = await self.client.get(
            "/geocode",
            params={"address": address},
        )
        if data.get("results"):
            location = data["results"][0]["geometry"]["location"]
            return {"lat": location["lat"], "lng": location["lng"]}
        raise ValueError(f"Could not geocode address: {address}")

    async def reverse_geocode(
        self, lat: float, lng: float
    ) -> str:
        """Convert coordinates to a human-readable address."""
        data = await self.client.get(
            "/reverse",
            params={"lat": lat, "lng": lng},
        )
        if data.get("results"):
            return data["results"][0]["formatted_address"]
        raise ValueError(
            f"Could not reverse geocode: {lat}, {lng}"
        )

Currency Exchange Rate Service

@dataclass
class ExchangeRate:
    from_currency: str
    to_currency: str
    rate: float
    timestamp: datetime


class CurrencyService:
    """Service for currency exchange rate lookups."""

    def __init__(self, api_key: str) -> None:
        self.client = CachedAPIClient(
            base_url="https://api.exchangerates.com/v1",
            api_key=api_key,
            cache_ttl=3600,  # Rates update hourly
        )

    async def get_rate(
        self, from_currency: str, to_currency: str
    ) -> ExchangeRate:
        """Get exchange rate between two currencies."""
        data = await self.client.get(
            "/latest",
            params={
                "base": from_currency,
                "symbols": to_currency,
            },
        )
        rate = data["rates"][to_currency]
        return ExchangeRate(
            from_currency=from_currency,
            to_currency=to_currency,
            rate=rate,
            timestamp=datetime.fromisoformat(data["date"]),
        )

    async def convert(
        self,
        amount: float,
        from_currency: str,
        to_currency: str,
    ) -> float:
        """Convert an amount between currencies."""
        rate_info = await self.get_rate(from_currency, to_currency)
        return round(amount * rate_info.rate, 2)

20.8 Webhook Handling

Webhooks invert the typical request-response pattern. Instead of your application polling an external service for updates, the external service sends HTTP requests to your application when events occur.

Webhook Architecture

┌──────────────┐   Event Occurs    ┌──────────────┐
│   External   │ ─────────────────>│              │
│   Service    │   POST to your    │  Your App    │
│  (Stripe,    │   webhook URL     │  (Webhook    │
│   GitHub)    │                   │   Endpoint)  │
└──────────────┘                   └──────────────┘
                                          │
                                          │ Process
                                          ▼
                                   ┌──────────────┐
                                   │  Business    │
                                   │  Logic       │
                                   └──────────────┘

Webhook Security: Signature Verification

The most important aspect of webhook handling is verifying that the request actually came from the expected service. Most services sign their webhook payloads:

import hashlib
import hmac
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()


def verify_webhook_signature(
    payload: bytes,
    signature: str,
    secret: str,
) -> bool:
    """Verify HMAC-SHA256 webhook signature."""
    expected = hmac.new(
        secret.encode("utf-8"),
        payload,
        hashlib.sha256,
    ).hexdigest()
    return hmac.compare_digest(f"sha256={expected}", signature)


@app.post("/webhooks/payment")
async def handle_payment_webhook(request: Request):
    """Handle incoming payment webhooks."""
    payload = await request.body()
    signature = request.headers.get("X-Webhook-Signature", "")

    if not verify_webhook_signature(
        payload, signature, "whsec_your_webhook_secret"
    ):
        raise HTTPException(status_code=401, detail="Invalid signature")

    event = await request.json()
    event_type = event.get("type")

    if event_type == "payment.succeeded":
        await handle_payment_success(event["data"])
    elif event_type == "payment.failed":
        await handle_payment_failure(event["data"])
    elif event_type == "refund.created":
        await handle_refund(event["data"])

    # Always return 200 quickly to acknowledge receipt
    return {"status": "received"}


async def handle_payment_success(data: dict) -> None:
    """Process successful payment."""
    payment_id = data["id"]
    # Update order status in database
    # Send confirmation email
    # Fulfill order


async def handle_payment_failure(data: dict) -> None:
    """Process failed payment."""
    payment_id = data["id"]
    # Update order status
    # Notify customer


async def handle_refund(data: dict) -> None:
    """Process refund event."""
    refund_id = data["id"]
    # Update order status
    # Credit customer account

Webhook Best Practices

Respond quickly. Webhook providers expect a response within a few seconds (typically 5-30 seconds). If processing takes longer, acknowledge receipt immediately and process asynchronously:

import asyncio
from collections.abc import Callable

# Simple in-process queue for demonstration
webhook_queue: asyncio.Queue = asyncio.Queue()


@app.post("/webhooks/events")
async def receive_webhook(request: Request):
    """Receive webhook and queue for processing."""
    event = await request.json()
    await webhook_queue.put(event)
    return {"status": "queued"}


async def webhook_processor():
    """Background worker that processes queued webhooks."""
    while True:
        event = await webhook_queue.get()
        try:
            await process_event(event)
        except Exception as e:
            # Log error, potentially retry
            print(f"Error processing webhook: {e}")
        finally:
            webhook_queue.task_done()

Handle duplicate deliveries. Webhook providers often retry on failure, which means you may receive the same event multiple times. Make your handlers idempotent:

processed_events: set[str] = set()


async def process_event_idempotently(event: dict) -> bool:
    """Process an event exactly once."""
    event_id = event.get("id")
    if not event_id:
        return False

    if event_id in processed_events:
        return True  # Already processed

    # In production, use a database to track processed events
    # processed_events is just for illustration
    await process_event(event)
    processed_events.add(event_id)
    return True

Log everything. Webhook debugging is notoriously difficult because you cannot replay the external service's request easily. Log the full payload, headers, and your processing result.

Callout: Testing Webhooks Locally

During development, external services cannot reach your localhost. Use tunneling tools like ngrok or localtunnel to expose your local webhook endpoint:

ngrok http 8000

This gives you a public URL (e.g., https://abc123.ngrok.io) that forwards to your local server. Register this URL with the external service's webhook configuration. Many services also provide webhook testing tools in their dashboards that let you send test events.


20.9 Rate Limiting and Error Handling

External APIs impose rate limits, experience outages, and return errors. A production application must handle all of these gracefully.

Understanding Rate Limits

Rate limits are expressed in various ways:

  • Requests per second (e.g., 10 req/s)
  • Requests per minute (e.g., 60 req/min)
  • Requests per day (e.g., 1,000 req/day)
  • Concurrent requests (e.g., 5 simultaneous)

APIs communicate rate limit status through response headers:

X-RateLimit-Limit: 100
X-RateLimit-Remaining: 42
X-RateLimit-Reset: 1634567890
Retry-After: 30

Implementing Rate-Limit-Aware Clients

import asyncio
import time


class RateLimiter:
    """Token bucket rate limiter."""

    def __init__(
        self, max_requests: int, time_window: float
    ) -> None:
        self.max_requests = max_requests
        self.time_window = time_window
        self.tokens = max_requests
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self) -> None:
        """Wait until a request token is available."""
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(
                self.max_requests,
                self.tokens
                + (elapsed / self.time_window) * self.max_requests,
            )
            self.last_refill = now

            if self.tokens < 1:
                wait_time = (
                    (1 - self.tokens) / self.max_requests
                ) * self.time_window
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1

Retry Strategies

When API calls fail due to transient errors (network issues, server overload, rate limiting), retrying is often the right approach. But retrying naively can make things worse.

Exponential Backoff with Jitter

The gold standard retry strategy:

import random
import httpx


async def request_with_retry(
    client: httpx.AsyncClient,
    method: str,
    url: str,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    **kwargs,
) -> httpx.Response:
    """Make an HTTP request with exponential backoff retry."""
    last_exception = None

    for attempt in range(max_retries + 1):
        try:
            response = await client.request(method, url, **kwargs)

            # Don't retry client errors (4xx) except 429
            if response.status_code == 429:
                retry_after = float(
                    response.headers.get("Retry-After", base_delay)
                )
                await asyncio.sleep(retry_after)
                continue

            response.raise_for_status()
            return response

        except (httpx.ConnectError, httpx.ReadTimeout) as exc:
            last_exception = exc
            if attempt < max_retries:
                delay = min(
                    base_delay * (2 ** attempt)
                    + random.uniform(0, 1),
                    max_delay,
                )
                await asyncio.sleep(delay)

        except httpx.HTTPStatusError as exc:
            if exc.response.status_code >= 500:
                last_exception = exc
                if attempt < max_retries:
                    delay = min(
                        base_delay * (2 ** attempt)
                        + random.uniform(0, 1),
                        max_delay,
                    )
                    await asyncio.sleep(delay)
            else:
                raise

    raise last_exception

The key principles:

  1. Exponential backoff: Each retry waits longer (1s, 2s, 4s, 8s...).
  2. Jitter: Random variation prevents thundering herd when many clients retry simultaneously.
  3. Maximum delay: Cap the wait time so retries do not take forever.
  4. Selective retry: Only retry transient errors (5xx, timeouts, 429). Never retry 4xx client errors (except 429).

Circuit Breaker Pattern

When an external service is completely down, retrying every request wastes resources and slows your application. The circuit breaker pattern detects failures and short-circuits requests:

import time
from enum import Enum


class CircuitState(Enum):
    CLOSED = "closed"        # Normal operation
    OPEN = "open"            # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if service recovered


class CircuitBreaker:
    """Circuit breaker for external service calls."""

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        success_threshold: int = 2,
    ) -> None:
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = 0.0

    def can_execute(self) -> bool:
        """Check if a request should be allowed."""
        if self.state == CircuitState.CLOSED:
            return True
        elif self.state == CircuitState.OPEN:
            if (
                time.monotonic() - self.last_failure_time
                > self.recovery_timeout
            ):
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
                return True
            return False
        else:  # HALF_OPEN
            return True

    def record_success(self) -> None:
        """Record a successful request."""
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
        elif self.state == CircuitState.CLOSED:
            self.failure_count = 0

    def record_failure(self) -> None:
        """Record a failed request."""
        self.failure_count += 1
        self.last_failure_time = time.monotonic()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

    async def execute(self, func, *args, **kwargs):
        """Execute a function with circuit breaker protection."""
        if not self.can_execute():
            raise CircuitBreakerOpenError(
                f"Circuit breaker is {self.state.value}"
            )
        try:
            result = await func(*args, **kwargs)
            self.record_success()
            return result
        except Exception as exc:
            self.record_failure()
            raise


class CircuitBreakerOpenError(Exception):
    """Raised when circuit breaker is open."""
    pass

The circuit breaker has three states:

  • Closed (normal): Requests pass through. Failures are counted.
  • Open (failing): Requests are immediately rejected. After a timeout period, transitions to half-open.
  • Half-Open (testing): A limited number of requests are allowed through. If they succeed, the circuit closes. If they fail, it opens again.

Vibe Check: Rate limiting, retries, and circuit breakers are the kind of infrastructure code that AI assistants generate extremely well. The patterns are well-established and the implementations are straightforward. Ask your AI to "add a circuit breaker to this API client" and you will get solid boilerplate that you can customize for your needs. Focus your human attention on the thresholds and timeouts -- those depend on your specific context.

Graceful Degradation

When an external service is unavailable, your application should degrade gracefully rather than crash:

class WeatherServiceWithFallback:
    """Weather service with graceful degradation."""

    def __init__(self, api_key: str) -> None:
        self.primary = WeatherAPIClient(api_key)
        self.circuit = CircuitBreaker(
            failure_threshold=3,
            recovery_timeout=60.0,
        )
        self._last_known: dict[str, dict] = {}

    async def get_weather(self, city: str) -> dict:
        """Get weather with fallback to cached data."""
        try:
            result = await self.circuit.execute(
                self.primary.get_current_weather, city
            )
            self._last_known[city] = result
            return result
        except (CircuitBreakerOpenError, Exception):
            if city in self._last_known:
                return {
                    **self._last_known[city],
                    "_stale": True,
                    "_message": "Using cached data",
                }
            return {
                "error": "Weather service unavailable",
                "_fallback": True,
            }

20.10 Building an Integration-Heavy Application

Let us put everything together by building a service that integrates multiple external APIs into a cohesive application. We will build a simplified e-commerce notification service that:

  1. Processes payment webhooks from a payment processor
  2. Sends confirmation emails via an email service
  3. Stores invoice PDFs in cloud storage
  4. Looks up currency exchange rates for international orders
  5. Sends Slack notifications for high-value orders

Application Architecture

                    ┌──────────────────────┐
  Payment Webhook   │                      │
 ──────────────────>│   Notification       │──── Email Service
                    │   Service            │
  Order Events      │                      │──── Cloud Storage
 ──────────────────>│   - Event Router     │
                    │   - Processors       │──── Currency API
                    │   - Error Handling   │
                    │                      │──── Slack API
                    └──────────────────────┘

Service Configuration

from dataclasses import dataclass


@dataclass
class ServiceConfig:
    """Configuration for all external service integrations."""

    # Payment processor
    payment_webhook_secret: str

    # Email service
    email_api_key: str
    email_from: str = "orders@shop.com"

    # Cloud storage
    storage_access_key: str = ""
    storage_secret_key: str = ""
    storage_bucket: str = "invoices"

    # Currency service
    currency_api_key: str = ""

    # Slack
    slack_webhook_url: str = ""
    slack_high_value_threshold: int = 10000  # cents

    @classmethod
    def from_env(cls) -> "ServiceConfig":
        """Load configuration from environment variables."""
        import os
        return cls(
            payment_webhook_secret=os.environ[
                "PAYMENT_WEBHOOK_SECRET"
            ],
            email_api_key=os.environ["EMAIL_API_KEY"],
            email_from=os.environ.get(
                "EMAIL_FROM", "orders@shop.com"
            ),
            storage_access_key=os.environ.get(
                "STORAGE_ACCESS_KEY", ""
            ),
            storage_secret_key=os.environ.get(
                "STORAGE_SECRET_KEY", ""
            ),
            storage_bucket=os.environ.get(
                "STORAGE_BUCKET", "invoices"
            ),
            currency_api_key=os.environ.get(
                "CURRENCY_API_KEY", ""
            ),
            slack_webhook_url=os.environ.get(
                "SLACK_WEBHOOK_URL", ""
            ),
        )

Event Processing Pipeline

from typing import Any
from collections.abc import Callable
import logging

logger = logging.getLogger(__name__)


class EventRouter:
    """Routes events to appropriate handlers."""

    def __init__(self) -> None:
        self._handlers: dict[
            str, list[Callable]
        ] = {}

    def register(
        self, event_type: str, handler: Callable
    ) -> None:
        """Register a handler for an event type."""
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)

    async def dispatch(
        self, event_type: str, data: dict[str, Any]
    ) -> list[dict[str, Any]]:
        """Dispatch an event to all registered handlers."""
        handlers = self._handlers.get(event_type, [])
        results = []

        for handler in handlers:
            try:
                result = await handler(data)
                results.append(
                    {"handler": handler.__name__, "status": "success",
                     "result": result}
                )
            except Exception as exc:
                logger.error(
                    f"Handler {handler.__name__} failed: {exc}"
                )
                results.append(
                    {"handler": handler.__name__, "status": "error",
                     "error": str(exc)}
                )

        return results

Wiring It All Together

from fastapi import FastAPI, Request, HTTPException

app = FastAPI(title="E-Commerce Notification Service")
config = ServiceConfig.from_env()
router = EventRouter()


# Initialize services
email_service = EmailService(config.email_api_key)
currency_service = CurrencyService(config.currency_api_key)


async def send_order_confirmation(data: dict) -> dict:
    """Send order confirmation email."""
    customer_email = data["customer"]["email"]
    order_id = data["order_id"]
    amount = data["amount"] / 100  # cents to dollars
    currency = data["currency"].upper()

    await email_service.send(
        EmailMessage(
            to=[customer_email],
            subject=f"Order Confirmed: #{order_id}",
            html_body=f"""
            <h1>Thank you for your order!</h1>
            <p>Order #{order_id} for {currency} {amount:.2f}
            has been confirmed.</p>
            """,
            from_email=config.email_from,
        )
    )
    return {"email_sent_to": customer_email}


async def notify_slack_high_value(data: dict) -> dict:
    """Notify Slack channel for high-value orders."""
    if data["amount"] < config.slack_high_value_threshold:
        return {"skipped": True, "reason": "Below threshold"}

    amount = data["amount"] / 100
    currency = data["currency"].upper()

    async with httpx.AsyncClient() as client:
        await client.post(
            config.slack_webhook_url,
            json={
                "text": (
                    f"High-value order #{data['order_id']}: "
                    f"{currency} {amount:.2f} from "
                    f"{data['customer']['email']}"
                ),
            },
        )
    return {"slack_notified": True}


async def convert_and_log_currency(data: dict) -> dict:
    """Convert order amount to USD for reporting."""
    if data["currency"] == "usd":
        return {"usd_amount": data["amount"]}

    usd_amount = await currency_service.convert(
        data["amount"] / 100,
        data["currency"],
        "usd",
    )
    return {"usd_amount": int(usd_amount * 100)}


# Register handlers
router.register("payment.succeeded", send_order_confirmation)
router.register("payment.succeeded", notify_slack_high_value)
router.register("payment.succeeded", convert_and_log_currency)


@app.post("/webhooks/payments")
async def payment_webhook(request: Request):
    """Handle payment processor webhooks."""
    payload = await request.body()
    signature = request.headers.get("X-Webhook-Signature", "")

    if not verify_webhook_signature(
        payload, signature, config.payment_webhook_secret
    ):
        raise HTTPException(
            status_code=401, detail="Invalid signature"
        )

    event = await request.json()
    event_type = event.get("type", "")
    event_data = event.get("data", {})

    results = await router.dispatch(event_type, event_data)
    return {"status": "processed", "results": results}

Error Handling in Multi-Service Operations

When multiple services are involved, partial failures are inevitable. Handle them with the "best effort" pattern:

async def process_order_complete(order_data: dict) -> dict:
    """Process a completed order with multiple integrations.

    Uses best-effort processing -- individual service failures
    do not block other operations.
    """
    results = {
        "email": {"status": "pending"},
        "storage": {"status": "pending"},
        "slack": {"status": "pending"},
    }

    # Run all integrations concurrently
    tasks = {
        "email": send_order_confirmation(order_data),
        "storage": store_invoice(order_data),
        "slack": notify_slack_high_value(order_data),
    }

    completed = await asyncio.gather(
        *tasks.values(), return_exceptions=True
    )

    for key, result in zip(tasks.keys(), completed):
        if isinstance(result, Exception):
            results[key] = {
                "status": "failed",
                "error": str(result),
            }
            logger.error(f"{key} integration failed: {result}")
        else:
            results[key] = {"status": "success", "data": result}

    return results

This pattern ensures that a failure in one service (e.g., Slack is down) does not prevent other operations (email confirmation, invoice storage) from completing.

Architecture Note: For truly mission-critical operations (like payment confirmation emails), consider using a persistent queue (Redis, RabbitMQ, SQS) instead of processing inline. This guarantees eventual delivery even if your application restarts between the webhook receipt and the email send.

Testing Integrations

Testing code that depends on external APIs requires special techniques:

import pytest
from unittest.mock import AsyncMock, patch


@pytest.mark.asyncio
async def test_send_order_confirmation():
    """Test order confirmation email is sent correctly."""
    mock_email = AsyncMock()

    with patch(
        "app.services.email_service", mock_email
    ):
        order_data = {
            "order_id": "ORD-123",
            "amount": 5000,
            "currency": "usd",
            "customer": {"email": "test@example.com"},
        }
        result = await send_order_confirmation(order_data)

        mock_email.send.assert_called_once()
        call_args = mock_email.send.call_args[0][0]
        assert "test@example.com" in call_args.to
        assert "ORD-123" in call_args.subject


@pytest.mark.asyncio
async def test_webhook_signature_verification():
    """Test that invalid webhook signatures are rejected."""
    assert not verify_webhook_signature(
        b"payload",
        "invalid_signature",
        "secret",
    )

For integration tests that hit real APIs, use the external service's test/sandbox mode:

@pytest.mark.integration
@pytest.mark.asyncio
async def test_payment_flow_sandbox():
    """Test payment flow against sandbox environment."""
    service = PaymentService(
        api_key="sk_test_sandbox_key",
        base_url="https://api.paymentprocessor.com/test",
    )
    result = await service.create_payment_intent(
        amount=1000,
        currency="usd",
        customer_id="cus_test_123",
    )
    assert result.status == PaymentStatus.PENDING
    await service.close()

AI Prompt Tip: Testing integration code is an excellent use case for AI assistants. Try: "Generate pytest fixtures and test cases for this PaymentService class. Mock all external HTTP calls. Test both success and error paths including timeouts, 429 rate limits, and 500 server errors."


Summary

External API integration is a fundamental skill for modern software development. In this chapter, we covered the full lifecycle of working with external services:

  1. The Integration Landscape -- Understanding the categories of external services and common integration patterns (direct calls, SDKs, abstraction layers, webhooks, message queues).

  2. RESTful API Consumption -- Building robust HTTP clients with httpx, handling async operations, setting timeouts, validating responses, and logging.

  3. OAuth and API Authentication -- Implementing OAuth 2.0 flows (authorization code, client credentials), managing tokens, and handling token refresh.

  4. Payment Processing -- Using tokenization, idempotency keys, and webhooks for reliable payment integration.

  5. Email and Notification Services -- Building multi-channel notification systems with the Strategy Pattern.

  6. Cloud Storage -- Uploading, downloading, and managing files with pre-signed URLs.

  7. Third-Party Data Services -- Consuming data APIs with caching for efficiency.

  8. Webhook Handling -- Receiving, verifying, and processing webhook events idempotently.

  9. Rate Limiting and Error Handling -- Implementing rate limiters, exponential backoff retry, circuit breakers, and graceful degradation.

  10. Integration-Heavy Applications -- Combining multiple services with event routing, concurrent processing, and best-effort error handling.

Throughout this chapter, we have seen how AI coding assistants accelerate integration work by generating boilerplate, navigating documentation, and applying established patterns. The key is understanding why these patterns exist so you can evaluate and customize the generated code for your specific requirements.

In Chapter 21, we will explore how AI assists with testing -- including the integration tests that are crucial for verifying the external service interactions we built here.


Key Terms

  • API Key: A secret string used to authenticate API requests, typically passed in a header or query parameter.
  • Circuit Breaker: A design pattern that detects failures and prevents an application from repeatedly trying an operation that is likely to fail.
  • Client Credentials Grant: An OAuth 2.0 flow for server-to-server authentication without user involvement.
  • Exponential Backoff: A retry strategy where the delay between retries increases exponentially with each attempt.
  • Idempotency Key: A unique identifier sent with a request to ensure the operation is only performed once, even if the request is retried.
  • Jitter: Random variation added to retry delays to prevent synchronized retries from multiple clients.
  • OAuth 2.0: An industry-standard protocol for authorization that enables applications to access user resources on other services.
  • Pre-signed URL: A time-limited URL that grants temporary access to a private cloud storage object.
  • Rate Limiting: Restrictions imposed by an API on the number of requests a client can make in a given time period.
  • Token Bucket: A rate-limiting algorithm that allows a burst of requests up to a maximum, then throttles to a steady rate.
  • Tokenization: In payments, the process of replacing sensitive card data with a non-sensitive token.
  • Webhook: An HTTP callback where an external service sends data to your application when an event occurs, inverting the typical request-response pattern.