No application is an island. In Chapter 17, we built REST APIs that expose functionality to the world. Now we flip the perspective: your application is the consumer, reaching out to external services that provide payments, email delivery, cloud...
In This Chapter
- Learning Objectives
- Introduction
- 20.1 The Integration Landscape
- 20.2 RESTful API Consumption
- 20.3 OAuth and API Authentication
- 20.4 Payment Processing Integration
- 20.5 Email and Notification Services
- 20.6 Cloud Storage Integration
- 20.7 Third-Party Data Services
- 20.8 Webhook Handling
- 20.9 Rate Limiting and Error Handling
- 20.10 Building an Integration-Heavy Application
- Summary
- Key Terms
Chapter 20: Working with External APIs and Integrations
Learning Objectives
By the end of this chapter, you will be able to:
- Recall the major categories of external APIs and common integration patterns used in modern software development (Knowledge)
- Explain how OAuth 2.0 authentication flows work and when to use each grant type (Comprehension)
- Apply retry strategies, circuit breaker patterns, and rate-limit handling when consuming external APIs (Application)
- Analyze API documentation to determine optimal integration strategies with AI assistance (Analysis)
- Evaluate trade-offs between different integration approaches, including direct API calls, SDKs, and abstraction layers (Evaluation)
- Create a production-ready application that integrates multiple external services with proper error handling and resilience (Synthesis)
Introduction
No application is an island. In Chapter 17, we built REST APIs that expose functionality to the world. Now we flip the perspective: your application is the consumer, reaching out to external services that provide payments, email delivery, cloud storage, geolocation, weather data, and hundreds of other capabilities.
Integration work is one of the areas where vibe coding truly shines. External APIs come with lengthy documentation, complex authentication flows, and subtle edge cases. AI coding assistants excel at navigating this complexity -- they have been trained on thousands of API integration examples and can generate boilerplate code that would otherwise take hours of documentation reading.
This chapter teaches you to build robust, production-ready integrations. We will cover the full spectrum: consuming REST APIs, handling authentication with OAuth 2.0, processing payments, sending notifications, storing files in the cloud, working with third-party data services, receiving webhooks, and handling the inevitable errors and rate limits that come with depending on external systems.
Vibe Check: When you ask an AI assistant to "integrate with the Stripe API," it can produce working code in seconds. But understanding why the code is structured a certain way -- retry logic, idempotency keys, webhook verification -- is what separates hobby projects from production systems. This chapter gives you that understanding.
20.1 The Integration Landscape
Modern applications rarely exist in isolation. A typical SaaS product might integrate with a dozen or more external services:
| Category | Examples | Purpose |
|---|---|---|
| Payments | Stripe, PayPal, Square | Process transactions |
| SendGrid, Mailgun, Amazon SES | Deliver transactional and marketing email | |
| Cloud Storage | AWS S3, Google Cloud Storage, Azure Blob | Store and serve files |
| Authentication | Auth0, Okta, Firebase Auth | User identity management |
| Communication | Twilio, Vonage, Slack | SMS, voice, chat |
| Analytics | Segment, Mixpanel, Amplitude | User behavior tracking |
| Maps & Location | Google Maps, Mapbox, HERE | Geocoding, directions, maps |
| Weather | OpenWeatherMap, WeatherAPI | Weather data and forecasts |
| Search | Algolia, Elasticsearch, Meilisearch | Full-text search |
| AI/ML | OpenAI, Anthropic, Hugging Face | AI model inference |
Integration Patterns
There are several fundamental patterns for integrating with external services:
Direct API Calls. Your application makes HTTP requests directly to the external service's REST (or GraphQL) endpoints. This is the most common pattern and offers maximum control.
Official SDKs. Many services provide language-specific libraries that wrap their API. SDKs can simplify integration but add dependencies and sometimes lag behind API updates.
Abstraction Layers. You build an internal interface that hides the specific external service behind a contract. This lets you swap providers without changing application code.
Webhooks. Instead of polling an external service, you register a URL that the service calls when events occur. This is an inversion of the typical request-response pattern.
Message Queues. For asynchronous integrations, you might place API calls on a queue (RabbitMQ, Redis, SQS) and process them with background workers.
┌─────────────┐ HTTP Request ┌──────────────┐
│ │ ──────────────────────>│ │
│ Your App │ │ External API │
│ │ <──────────────────────│ │
└─────────────┘ HTTP Response └──────────────┘
┌─────────────┐ Register URL ┌──────────────┐
│ │ ──────────────────────>│ │
│ Your App │ │ External API │
│ │ <──────────────────────│ │
└─────────────┘ Webhook Callback └──────────────┘
How AI Helps with API Integration
API documentation can be overwhelming. A single service like Stripe has hundreds of endpoints, each with dozens of parameters. AI coding assistants help in several ways:
- Boilerplate generation. "Create a Python client for the OpenWeatherMap API with error handling" produces a working starting point in seconds.
- Documentation navigation. "What parameters does the Stripe PaymentIntent create endpoint accept?" gets you answers without searching through docs.
- Pattern application. "Add exponential backoff retry logic to this API call" applies a well-known pattern correctly.
- Code translation. "Convert this curl example from the API docs to Python requests" translates between formats instantly.
AI Prompt Tip: When asking an AI assistant to integrate with an API, always specify: (1) which library to use, (2) how to handle errors, (3) whether you need async support, and (4) what authentication method the API requires. For example: "Create a SendGrid email client using httpx with async support, API key authentication, and retry logic for transient failures."
20.2 RESTful API Consumption
In Chapter 17, we built REST APIs with FastAPI. Now we are on the other side -- consuming them. Python offers two primary HTTP libraries for this work:
requests-- The classic synchronous HTTP library. Simple, well-documented, widely used.httpx-- A modern library that supports both sync and async requests, HTTP/2, and has arequests-compatible API.
For new projects, httpx is generally the better choice because it supports async operations natively, which is critical when your application makes multiple API calls concurrently.
Building a Robust API Client
A production API client needs more than just requests.get(). Here is the anatomy of a well-structured client:
import httpx
from typing import Any
class WeatherAPIClient:
"""Client for consuming the OpenWeatherMap API."""
BASE_URL = "https://api.openweathermap.org/data/2.5"
def __init__(self, api_key: str, timeout: float = 10.0) -> None:
self.api_key = api_key
self.client = httpx.Client(
base_url=self.BASE_URL,
timeout=timeout,
headers={"Accept": "application/json"},
)
def get_current_weather(self, city: str) -> dict[str, Any]:
"""Fetch current weather for a city."""
response = self.client.get(
"/weather",
params={"q": city, "appid": self.api_key, "units": "metric"},
)
response.raise_for_status()
return response.json()
def close(self) -> None:
"""Close the underlying HTTP client."""
self.client.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
This basic client is a good start, but production systems need several additional capabilities that we will build throughout this chapter: retry logic, rate-limit handling, response caching, and circuit breakers.
Async API Consumption
When your application calls multiple external APIs, doing so sequentially wastes time. Async HTTP calls let you fire off multiple requests concurrently:
import asyncio
import httpx
async def fetch_multiple_apis() -> dict[str, dict]:
"""Fetch data from multiple APIs concurrently."""
async with httpx.AsyncClient() as client:
weather_task = client.get(
"https://api.openweathermap.org/data/2.5/weather",
params={"q": "London", "appid": "YOUR_KEY"},
)
news_task = client.get(
"https://newsapi.org/v2/top-headlines",
params={"country": "us", "apiKey": "YOUR_KEY"},
)
weather_resp, news_resp = await asyncio.gather(
weather_task, news_task
)
return {
"weather": weather_resp.json(),
"news": news_resp.json(),
}
With asyncio.gather(), both API calls execute concurrently. If each takes 500ms, the total time is roughly 500ms instead of 1000ms. For a dashboard that aggregates data from five services, the difference is dramatic.
Cross-Reference: See Chapter 17 for building the server side of REST APIs with FastAPI. The async patterns here complement the async endpoint handlers you learned there.
Request and Response Handling Best Practices
Always set timeouts. An external API that hangs can block your entire application. Set both connect and read timeouts:
client = httpx.Client(
timeout=httpx.Timeout(
connect=5.0, # Time to establish connection
read=10.0, # Time to receive response
write=5.0, # Time to send request body
pool=5.0, # Time to acquire connection from pool
)
)
Validate response data. Never trust external API responses blindly. Use Pydantic models to validate:
from pydantic import BaseModel
class WeatherResponse(BaseModel):
temp: float
humidity: int
description: str
@classmethod
def from_api_response(cls, data: dict) -> "WeatherResponse":
main = data.get("main", {})
weather = data.get("weather", [{}])[0]
return cls(
temp=main.get("temp", 0.0),
humidity=main.get("humidity", 0),
description=weather.get("description", "unknown"),
)
Log requests and responses. When debugging integration issues, you need visibility into what was sent and received:
import logging
logger = logging.getLogger(__name__)
def log_request(request: httpx.Request) -> None:
logger.info(f"Request: {request.method} {request.url}")
def log_response(response: httpx.Response) -> None:
logger.info(f"Response: {response.status_code} from {response.url}")
client = httpx.Client(
event_hooks={
"request": [log_request],
"response": [log_response],
}
)
20.3 OAuth and API Authentication
Most external APIs require authentication. The method varies by service, but falls into a few common patterns:
API Key Authentication
The simplest approach. You pass a key in a header or query parameter:
# Header-based API key
headers = {"Authorization": "Bearer sk-your-api-key"}
# Query parameter API key
params = {"apiKey": "your-api-key"}
# Custom header
headers = {"X-API-Key": "your-api-key"}
API keys are suitable for server-to-server communication but should never be exposed in client-side code.
OAuth 2.0 Fundamentals
OAuth 2.0 is the industry standard for delegated authorization. It allows your application to access a user's resources on another service without knowing the user's password.
There are several OAuth 2.0 grant types, each suited to different scenarios:
| Grant Type | Use Case | Flow |
|---|---|---|
| Authorization Code | Web apps with server backend | User redirects to auth server, gets code, server exchanges for token |
| Authorization Code + PKCE | Single-page apps, mobile apps | Same as above with additional security layer |
| Client Credentials | Server-to-server, no user context | App authenticates directly with client ID and secret |
| Device Code | CLI tools, smart TVs | User enters code on separate device |
Authorization Code Flow
This is the most common OAuth flow for web applications. Here is how it works:
┌────────┐ 1. Redirect to Auth Server ┌────────────┐
│ │ ─────────────────────────────────>│ │
│ User │ 2. User Logs In & Consents │ Auth │
│Browser │ <─────────────────────────────────│ Server │
│ │ 3. Redirect with Auth Code │ │
└────────┘ └────────────┘
│ │
│ 4. Send Auth Code │
▼ │
┌────────┐ 5. Exchange Code for Token ┌────────────┐
│ Your │ ─────────────────────────────────>│ Auth │
│ App │ 6. Receive Access Token │ Server │
│Server │ <─────────────────────────────────│ │
└────────┘ └────────────┘
│
│ 7. Use Access Token
▼
┌────────────┐
│ External │
│ API │
└────────────┘
The implementation involves several steps:
import httpx
import secrets
from urllib.parse import urlencode
class OAuthClient:
"""OAuth 2.0 Authorization Code flow client."""
def __init__(
self,
client_id: str,
client_secret: str,
redirect_uri: str,
auth_url: str,
token_url: str,
) -> None:
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.auth_url = auth_url
self.token_url = token_url
def get_authorization_url(self, scopes: list[str]) -> tuple[str, str]:
"""Generate the URL to redirect users for authorization."""
state = secrets.token_urlsafe(32)
params = {
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"response_type": "code",
"scope": " ".join(scopes),
"state": state,
}
url = f"{self.auth_url}?{urlencode(params)}"
return url, state
async def exchange_code_for_token(
self, code: str
) -> dict[str, str]:
"""Exchange authorization code for access token."""
async with httpx.AsyncClient() as client:
response = await client.post(
self.token_url,
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
"client_secret": self.client_secret,
},
)
response.raise_for_status()
return response.json()
Client Credentials Flow
For server-to-server communication where no user is involved:
async def get_client_credentials_token(
token_url: str,
client_id: str,
client_secret: str,
scopes: list[str],
) -> dict[str, str]:
"""Obtain token using client credentials grant."""
async with httpx.AsyncClient() as client:
response = await client.post(
token_url,
data={
"grant_type": "client_credentials",
"scope": " ".join(scopes),
},
auth=(client_id, client_secret),
)
response.raise_for_status()
return response.json()
Token Management
Access tokens expire. A production application must handle token refresh:
import time
class TokenManager:
"""Manages OAuth tokens with automatic refresh."""
def __init__(
self,
token_url: str,
client_id: str,
client_secret: str,
) -> None:
self.token_url = token_url
self.client_id = client_id
self.client_secret = client_secret
self._access_token: str | None = None
self._refresh_token: str | None = None
self._expires_at: float = 0.0
@property
def is_expired(self) -> bool:
"""Check if current token is expired (with 60s buffer)."""
return time.time() >= (self._expires_at - 60)
async def get_valid_token(self) -> str:
"""Return a valid access token, refreshing if needed."""
if self._access_token and not self.is_expired:
return self._access_token
if self._refresh_token:
await self._refresh()
else:
raise RuntimeError("No valid token and no refresh token")
return self._access_token
async def _refresh(self) -> None:
"""Refresh the access token."""
async with httpx.AsyncClient() as client:
response = await client.post(
self.token_url,
data={
"grant_type": "refresh_token",
"refresh_token": self._refresh_token,
"client_id": self.client_id,
"client_secret": self.client_secret,
},
)
response.raise_for_status()
data = response.json()
self._access_token = data["access_token"]
self._refresh_token = data.get(
"refresh_token", self._refresh_token
)
self._expires_at = time.time() + data.get(
"expires_in", 3600
)
Security Warning: Never store OAuth tokens in client-side code, local storage, or version control. Use secure server-side sessions, encrypted cookies, or dedicated secret management services. Environment variables are acceptable for API keys in development but use a secrets manager (AWS Secrets Manager, HashiCorp Vault) in production.
20.4 Payment Processing Integration
Payment integration is one of the most critical -- and most sensitive -- integrations your application can have. Mistakes can cost real money, both for you and your users.
Payment Integration Principles
Never handle raw card numbers. Modern payment processors use tokenization. The user's card details go directly to the payment processor (usually via a client-side widget), and your server only receives a token representing the payment method.
Idempotency is essential. Network failures happen. If a payment request times out, you need to safely retry without charging the customer twice. Use idempotency keys:
import uuid
import httpx
async def create_payment(
amount: int,
currency: str,
payment_method_token: str,
idempotency_key: str | None = None,
) -> dict:
"""Create a payment with idempotency protection."""
if idempotency_key is None:
idempotency_key = str(uuid.uuid4())
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.paymentprocessor.com/v1/payments",
json={
"amount": amount,
"currency": currency,
"payment_method": payment_method_token,
},
headers={
"Authorization": "Bearer sk_live_your_key",
"Idempotency-Key": idempotency_key,
},
)
response.raise_for_status()
return response.json()
Use webhooks for payment status. Do not rely solely on the synchronous API response. Payments can be asynchronous (bank transfers, 3D Secure verification). Always set up webhooks to receive payment status updates.
A Complete Payment Flow
Here is a typical payment integration architecture:
┌──────────┐ 1. Card Details ┌──────────────┐
│ Client │ ────────────────────>│ Payment │
│ (React) │ 2. Payment Token │ Processor │
│ │ <────────────────────│ (Stripe) │
└──────────┘ └──────────────┘
│ │
│ 3. Token + Order Details │
▼ │
┌──────────┐ 4. Create Payment ┌──────────────┐
│ Your │ ────────────────────>│ Payment │
│ Server │ 5. Payment Status │ Processor │
│ │ <────────────────────│ │
└──────────┘ └──────────────┘
│ │
│ │
│ <──── 6. Webhook: Payment ────────┘
│ Succeeded/Failed
▼
┌──────────┐
│ Database │
│ (Update │
│ Order) │
└──────────┘
Implementing a Payment Service
from dataclasses import dataclass
from enum import Enum
class PaymentStatus(Enum):
PENDING = "pending"
SUCCEEDED = "succeeded"
FAILED = "failed"
REFUNDED = "refunded"
@dataclass
class PaymentResult:
payment_id: str
status: PaymentStatus
amount: int
currency: str
error_message: str | None = None
class PaymentService:
"""Service for processing payments via external processor."""
def __init__(self, api_key: str, base_url: str) -> None:
self.api_key = api_key
self.base_url = base_url
self.client = httpx.AsyncClient(
base_url=base_url,
headers={"Authorization": f"Bearer {api_key}"},
timeout=30.0,
)
async def create_payment_intent(
self,
amount: int,
currency: str,
customer_id: str,
metadata: dict | None = None,
) -> PaymentResult:
"""Create a payment intent for the given amount."""
idempotency_key = str(uuid.uuid4())
try:
response = await self.client.post(
"/v1/payment_intents",
json={
"amount": amount,
"currency": currency,
"customer": customer_id,
"metadata": metadata or {},
},
headers={"Idempotency-Key": idempotency_key},
)
response.raise_for_status()
data = response.json()
return PaymentResult(
payment_id=data["id"],
status=PaymentStatus(data["status"]),
amount=data["amount"],
currency=data["currency"],
)
except httpx.HTTPStatusError as exc:
error_data = exc.response.json()
return PaymentResult(
payment_id="",
status=PaymentStatus.FAILED,
amount=amount,
currency=currency,
error_message=error_data.get("error", {}).get(
"message", "Unknown error"
),
)
async def refund_payment(
self, payment_id: str, amount: int | None = None
) -> PaymentResult:
"""Refund a payment, optionally partially."""
payload: dict = {"payment_intent": payment_id}
if amount is not None:
payload["amount"] = amount
response = await self.client.post(
"/v1/refunds",
json=payload,
)
response.raise_for_status()
data = response.json()
return PaymentResult(
payment_id=data["id"],
status=PaymentStatus.REFUNDED,
amount=data["amount"],
currency=data["currency"],
)
async def close(self) -> None:
await self.client.aclose()
Best Practice: Always store the external payment processor's payment ID alongside your internal order ID. When discrepancies arise (and they will), you need to be able to cross-reference between your system and the payment processor's dashboard.
20.5 Email and Notification Services
Sending email from your application requires an external email delivery service. Trying to run your own mail server is a path to madness -- deliverability, spam filtering, bounce handling, and reputation management are full-time jobs.
Email Service Integration
Here is a pattern for integrating with a transactional email service:
from dataclasses import dataclass, field
@dataclass
class EmailMessage:
to: list[str]
subject: str
html_body: str
text_body: str | None = None
from_email: str = "noreply@yourapp.com"
from_name: str = "Your App"
reply_to: str | None = None
cc: list[str] = field(default_factory=list)
bcc: list[str] = field(default_factory=list)
attachments: list[dict] = field(default_factory=list)
class EmailService:
"""Service for sending transactional emails."""
def __init__(self, api_key: str) -> None:
self.api_key = api_key
self.base_url = "https://api.emailservice.com/v3"
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
timeout=15.0,
)
async def send(self, message: EmailMessage) -> dict:
"""Send a single email message."""
payload = {
"personalizations": [
{
"to": [{"email": addr} for addr in message.to],
"cc": [{"email": addr} for addr in message.cc],
"bcc": [{"email": addr} for addr in message.bcc],
}
],
"from": {
"email": message.from_email,
"name": message.from_name,
},
"subject": message.subject,
"content": [
{"type": "text/html", "value": message.html_body},
],
}
if message.text_body:
payload["content"].insert(
0,
{"type": "text/plain", "value": message.text_body},
)
if message.reply_to:
payload["reply_to"] = {"email": message.reply_to}
response = await self.client.post("/mail/send", json=payload)
response.raise_for_status()
return {"status": "sent", "status_code": response.status_code}
async def send_template(
self,
template_id: str,
to: list[str],
dynamic_data: dict,
) -> dict:
"""Send email using a pre-defined template."""
payload = {
"personalizations": [
{
"to": [{"email": addr} for addr in to],
"dynamic_template_data": dynamic_data,
}
],
"from": {
"email": "noreply@yourapp.com",
"name": "Your App",
},
"template_id": template_id,
}
response = await self.client.post("/mail/send", json=payload)
response.raise_for_status()
return {"status": "sent", "status_code": response.status_code}
async def close(self) -> None:
await self.client.aclose()
Notification Patterns
Email is just one notification channel. A robust notification system supports multiple channels through a unified interface:
from abc import ABC, abstractmethod
class NotificationChannel(ABC):
"""Abstract base class for notification channels."""
@abstractmethod
async def send(
self, recipient: str, subject: str, body: str
) -> bool:
"""Send a notification. Returns True if successful."""
...
class EmailChannel(NotificationChannel):
async def send(
self, recipient: str, subject: str, body: str
) -> bool:
# Uses EmailService internally
...
class SMSChannel(NotificationChannel):
async def send(
self, recipient: str, subject: str, body: str
) -> bool:
# Uses Twilio or similar
...
class SlackChannel(NotificationChannel):
async def send(
self, recipient: str, subject: str, body: str
) -> bool:
# Uses Slack Webhook API
...
class NotificationService:
"""Unified notification service supporting multiple channels."""
def __init__(self) -> None:
self.channels: dict[str, NotificationChannel] = {}
def register_channel(
self, name: str, channel: NotificationChannel
) -> None:
self.channels[name] = channel
async def notify(
self,
channel_name: str,
recipient: str,
subject: str,
body: str,
) -> bool:
channel = self.channels.get(channel_name)
if not channel:
raise ValueError(f"Unknown channel: {channel_name}")
return await channel.send(recipient, subject, body)
async def notify_all(
self,
recipient_map: dict[str, str],
subject: str,
body: str,
) -> dict[str, bool]:
"""Send notification to multiple channels."""
results = {}
for channel_name, recipient in recipient_map.items():
try:
results[channel_name] = await self.notify(
channel_name, recipient, subject, body
)
except Exception:
results[channel_name] = False
return results
Design Pattern: The abstraction layer pattern shown here is called the Strategy Pattern. By defining a common interface (
NotificationChannel) and swapping implementations, you can add new channels (push notifications, in-app messages) without modifying the core notification logic.
20.6 Cloud Storage Integration
Cloud object storage (S3, GCS, Azure Blob) is the standard way to store user uploads, generated files, backups, and static assets. The API patterns are similar across providers.
S3-Style Storage Client
from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
import hmac
@dataclass
class StorageObject:
key: str
size: int
content_type: str
last_modified: datetime
etag: str
class CloudStorageClient:
"""Client for S3-compatible cloud storage."""
def __init__(
self,
access_key: str,
secret_key: str,
bucket: str,
region: str = "us-east-1",
endpoint: str | None = None,
) -> None:
self.access_key = access_key
self.secret_key = secret_key
self.bucket = bucket
self.region = region
self.endpoint = (
endpoint
or f"https://{bucket}.s3.{region}.amazonaws.com"
)
self.client = httpx.AsyncClient(timeout=60.0)
async def upload_file(
self,
key: str,
data: bytes,
content_type: str = "application/octet-stream",
) -> StorageObject:
"""Upload a file to cloud storage."""
response = await self.client.put(
f"{self.endpoint}/{key}",
content=data,
headers={
"Content-Type": content_type,
"Content-Length": str(len(data)),
},
)
response.raise_for_status()
return StorageObject(
key=key,
size=len(data),
content_type=content_type,
last_modified=datetime.utcnow(),
etag=response.headers.get("ETag", ""),
)
async def download_file(self, key: str) -> bytes:
"""Download a file from cloud storage."""
response = await self.client.get(
f"{self.endpoint}/{key}"
)
response.raise_for_status()
return response.content
async def delete_file(self, key: str) -> bool:
"""Delete a file from cloud storage."""
response = await self.client.delete(
f"{self.endpoint}/{key}"
)
return response.status_code == 204
async def generate_presigned_url(
self,
key: str,
expires_in: int = 3600,
) -> str:
"""Generate a pre-signed URL for temporary access."""
expiration = datetime.utcnow() + timedelta(seconds=expires_in)
# In production, use proper AWS Signature V4 signing
return (
f"{self.endpoint}/{key}"
f"?expires={int(expiration.timestamp())}"
)
async def list_objects(
self, prefix: str = "", max_keys: int = 1000
) -> list[StorageObject]:
"""List objects in the bucket with optional prefix filter."""
response = await self.client.get(
self.endpoint,
params={
"list-type": "2",
"prefix": prefix,
"max-keys": max_keys,
},
)
response.raise_for_status()
# Parse XML response (simplified)
return []
async def close(self) -> None:
await self.client.aclose()
Pre-signed URLs
A critical pattern for cloud storage is pre-signed URLs. Instead of routing file uploads and downloads through your server, you generate a temporary URL that lets the client interact directly with the storage service:
┌──────────┐ 1. Request upload URL ┌──────────┐
│ Client │ ───────────────────────>│ Your │
│ │ 2. Pre-signed URL │ Server │
│ │ <───────────────────────│ │
└──────────┘ └──────────┘
│
│ 3. Upload directly
▼
┌──────────────┐
│ Cloud │
│ Storage │
└──────────────┘
This pattern reduces your server's bandwidth and processing load, especially for large files.
AI Prompt Tip: When asking an AI assistant to generate cloud storage integration code, specify whether you want to use the provider's official SDK (e.g.,
boto3for AWS) or raw HTTP requests. SDKs handle authentication signing automatically, which is significantly easier. For example: "Create a file upload service using boto3 that supports multipart uploads for files over 100MB, generates pre-signed download URLs, and organizes files by user ID."
20.7 Third-Party Data Services
Many applications enrich their functionality with data from external services: weather, maps, currency exchange rates, stock prices, and more.
Working with Data APIs
Here is a pattern for consuming a third-party data API with caching:
import time
from typing import Any
class CachedAPIClient:
"""API client with in-memory response caching."""
def __init__(
self,
base_url: str,
api_key: str,
cache_ttl: int = 300,
) -> None:
self.base_url = base_url
self.api_key = api_key
self.cache_ttl = cache_ttl
self._cache: dict[str, tuple[float, Any]] = {}
self.client = httpx.AsyncClient(
base_url=base_url,
headers={"Authorization": f"Bearer {api_key}"},
timeout=10.0,
)
def _cache_key(self, path: str, params: dict) -> str:
"""Generate a cache key from request parameters."""
sorted_params = sorted(params.items())
return f"{path}:{sorted_params}"
def _get_cached(self, key: str) -> Any | None:
"""Return cached value if it exists and is not expired."""
if key in self._cache:
timestamp, data = self._cache[key]
if time.time() - timestamp < self.cache_ttl:
return data
del self._cache[key]
return None
async def get(
self, path: str, params: dict | None = None
) -> Any:
"""Make a GET request with caching."""
params = params or {}
cache_key = self._cache_key(path, params)
cached = self._get_cached(cache_key)
if cached is not None:
return cached
response = await self.client.get(path, params=params)
response.raise_for_status()
data = response.json()
self._cache[cache_key] = (time.time(), data)
return data
async def close(self) -> None:
await self.client.aclose()
Geocoding Example
class GeocodingService:
"""Service for converting addresses to coordinates."""
def __init__(self, api_key: str) -> None:
self.client = CachedAPIClient(
base_url="https://api.geocoding.com/v1",
api_key=api_key,
cache_ttl=86400, # Cache geocoding results for 24 hours
)
async def geocode(self, address: str) -> dict[str, float]:
"""Convert an address to latitude/longitude."""
data = await self.client.get(
"/geocode",
params={"address": address},
)
if data.get("results"):
location = data["results"][0]["geometry"]["location"]
return {"lat": location["lat"], "lng": location["lng"]}
raise ValueError(f"Could not geocode address: {address}")
async def reverse_geocode(
self, lat: float, lng: float
) -> str:
"""Convert coordinates to a human-readable address."""
data = await self.client.get(
"/reverse",
params={"lat": lat, "lng": lng},
)
if data.get("results"):
return data["results"][0]["formatted_address"]
raise ValueError(
f"Could not reverse geocode: {lat}, {lng}"
)
Currency Exchange Rate Service
@dataclass
class ExchangeRate:
from_currency: str
to_currency: str
rate: float
timestamp: datetime
class CurrencyService:
"""Service for currency exchange rate lookups."""
def __init__(self, api_key: str) -> None:
self.client = CachedAPIClient(
base_url="https://api.exchangerates.com/v1",
api_key=api_key,
cache_ttl=3600, # Rates update hourly
)
async def get_rate(
self, from_currency: str, to_currency: str
) -> ExchangeRate:
"""Get exchange rate between two currencies."""
data = await self.client.get(
"/latest",
params={
"base": from_currency,
"symbols": to_currency,
},
)
rate = data["rates"][to_currency]
return ExchangeRate(
from_currency=from_currency,
to_currency=to_currency,
rate=rate,
timestamp=datetime.fromisoformat(data["date"]),
)
async def convert(
self,
amount: float,
from_currency: str,
to_currency: str,
) -> float:
"""Convert an amount between currencies."""
rate_info = await self.get_rate(from_currency, to_currency)
return round(amount * rate_info.rate, 2)
20.8 Webhook Handling
Webhooks invert the typical request-response pattern. Instead of your application polling an external service for updates, the external service sends HTTP requests to your application when events occur.
Webhook Architecture
┌──────────────┐ Event Occurs ┌──────────────┐
│ External │ ─────────────────>│ │
│ Service │ POST to your │ Your App │
│ (Stripe, │ webhook URL │ (Webhook │
│ GitHub) │ │ Endpoint) │
└──────────────┘ └──────────────┘
│
│ Process
▼
┌──────────────┐
│ Business │
│ Logic │
└──────────────┘
Webhook Security: Signature Verification
The most important aspect of webhook handling is verifying that the request actually came from the expected service. Most services sign their webhook payloads:
import hashlib
import hmac
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
def verify_webhook_signature(
payload: bytes,
signature: str,
secret: str,
) -> bool:
"""Verify HMAC-SHA256 webhook signature."""
expected = hmac.new(
secret.encode("utf-8"),
payload,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
@app.post("/webhooks/payment")
async def handle_payment_webhook(request: Request):
"""Handle incoming payment webhooks."""
payload = await request.body()
signature = request.headers.get("X-Webhook-Signature", "")
if not verify_webhook_signature(
payload, signature, "whsec_your_webhook_secret"
):
raise HTTPException(status_code=401, detail="Invalid signature")
event = await request.json()
event_type = event.get("type")
if event_type == "payment.succeeded":
await handle_payment_success(event["data"])
elif event_type == "payment.failed":
await handle_payment_failure(event["data"])
elif event_type == "refund.created":
await handle_refund(event["data"])
# Always return 200 quickly to acknowledge receipt
return {"status": "received"}
async def handle_payment_success(data: dict) -> None:
"""Process successful payment."""
payment_id = data["id"]
# Update order status in database
# Send confirmation email
# Fulfill order
async def handle_payment_failure(data: dict) -> None:
"""Process failed payment."""
payment_id = data["id"]
# Update order status
# Notify customer
async def handle_refund(data: dict) -> None:
"""Process refund event."""
refund_id = data["id"]
# Update order status
# Credit customer account
Webhook Best Practices
Respond quickly. Webhook providers expect a response within a few seconds (typically 5-30 seconds). If processing takes longer, acknowledge receipt immediately and process asynchronously:
import asyncio
from collections.abc import Callable
# Simple in-process queue for demonstration
webhook_queue: asyncio.Queue = asyncio.Queue()
@app.post("/webhooks/events")
async def receive_webhook(request: Request):
"""Receive webhook and queue for processing."""
event = await request.json()
await webhook_queue.put(event)
return {"status": "queued"}
async def webhook_processor():
"""Background worker that processes queued webhooks."""
while True:
event = await webhook_queue.get()
try:
await process_event(event)
except Exception as e:
# Log error, potentially retry
print(f"Error processing webhook: {e}")
finally:
webhook_queue.task_done()
Handle duplicate deliveries. Webhook providers often retry on failure, which means you may receive the same event multiple times. Make your handlers idempotent:
processed_events: set[str] = set()
async def process_event_idempotently(event: dict) -> bool:
"""Process an event exactly once."""
event_id = event.get("id")
if not event_id:
return False
if event_id in processed_events:
return True # Already processed
# In production, use a database to track processed events
# processed_events is just for illustration
await process_event(event)
processed_events.add(event_id)
return True
Log everything. Webhook debugging is notoriously difficult because you cannot replay the external service's request easily. Log the full payload, headers, and your processing result.
Callout: Testing Webhooks Locally
During development, external services cannot reach your
localhost. Use tunneling tools like ngrok or localtunnel to expose your local webhook endpoint:
ngrok http 8000This gives you a public URL (e.g.,
https://abc123.ngrok.io) that forwards to your local server. Register this URL with the external service's webhook configuration. Many services also provide webhook testing tools in their dashboards that let you send test events.
20.9 Rate Limiting and Error Handling
External APIs impose rate limits, experience outages, and return errors. A production application must handle all of these gracefully.
Understanding Rate Limits
Rate limits are expressed in various ways:
- Requests per second (e.g., 10 req/s)
- Requests per minute (e.g., 60 req/min)
- Requests per day (e.g., 1,000 req/day)
- Concurrent requests (e.g., 5 simultaneous)
APIs communicate rate limit status through response headers:
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 42
X-RateLimit-Reset: 1634567890
Retry-After: 30
Implementing Rate-Limit-Aware Clients
import asyncio
import time
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(
self, max_requests: int, time_window: float
) -> None:
self.max_requests = max_requests
self.time_window = time_window
self.tokens = max_requests
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
"""Wait until a request token is available."""
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(
self.max_requests,
self.tokens
+ (elapsed / self.time_window) * self.max_requests,
)
self.last_refill = now
if self.tokens < 1:
wait_time = (
(1 - self.tokens) / self.max_requests
) * self.time_window
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
Retry Strategies
When API calls fail due to transient errors (network issues, server overload, rate limiting), retrying is often the right approach. But retrying naively can make things worse.
Exponential Backoff with Jitter
The gold standard retry strategy:
import random
import httpx
async def request_with_retry(
client: httpx.AsyncClient,
method: str,
url: str,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
**kwargs,
) -> httpx.Response:
"""Make an HTTP request with exponential backoff retry."""
last_exception = None
for attempt in range(max_retries + 1):
try:
response = await client.request(method, url, **kwargs)
# Don't retry client errors (4xx) except 429
if response.status_code == 429:
retry_after = float(
response.headers.get("Retry-After", base_delay)
)
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
return response
except (httpx.ConnectError, httpx.ReadTimeout) as exc:
last_exception = exc
if attempt < max_retries:
delay = min(
base_delay * (2 ** attempt)
+ random.uniform(0, 1),
max_delay,
)
await asyncio.sleep(delay)
except httpx.HTTPStatusError as exc:
if exc.response.status_code >= 500:
last_exception = exc
if attempt < max_retries:
delay = min(
base_delay * (2 ** attempt)
+ random.uniform(0, 1),
max_delay,
)
await asyncio.sleep(delay)
else:
raise
raise last_exception
The key principles:
- Exponential backoff: Each retry waits longer (1s, 2s, 4s, 8s...).
- Jitter: Random variation prevents thundering herd when many clients retry simultaneously.
- Maximum delay: Cap the wait time so retries do not take forever.
- Selective retry: Only retry transient errors (5xx, timeouts, 429). Never retry 4xx client errors (except 429).
Circuit Breaker Pattern
When an external service is completely down, retrying every request wastes resources and slows your application. The circuit breaker pattern detects failures and short-circuits requests:
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
"""Circuit breaker for external service calls."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
success_threshold: int = 2,
) -> None:
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0.0
def can_execute(self) -> bool:
"""Check if a request should be allowed."""
if self.state == CircuitState.CLOSED:
return True
elif self.state == CircuitState.OPEN:
if (
time.monotonic() - self.last_failure_time
> self.recovery_timeout
):
self.state = CircuitState.HALF_OPEN
self.success_count = 0
return True
return False
else: # HALF_OPEN
return True
def record_success(self) -> None:
"""Record a successful request."""
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
elif self.state == CircuitState.CLOSED:
self.failure_count = 0
def record_failure(self) -> None:
"""Record a failed request."""
self.failure_count += 1
self.last_failure_time = time.monotonic()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
async def execute(self, func, *args, **kwargs):
"""Execute a function with circuit breaker protection."""
if not self.can_execute():
raise CircuitBreakerOpenError(
f"Circuit breaker is {self.state.value}"
)
try:
result = await func(*args, **kwargs)
self.record_success()
return result
except Exception as exc:
self.record_failure()
raise
class CircuitBreakerOpenError(Exception):
"""Raised when circuit breaker is open."""
pass
The circuit breaker has three states:
- Closed (normal): Requests pass through. Failures are counted.
- Open (failing): Requests are immediately rejected. After a timeout period, transitions to half-open.
- Half-Open (testing): A limited number of requests are allowed through. If they succeed, the circuit closes. If they fail, it opens again.
Vibe Check: Rate limiting, retries, and circuit breakers are the kind of infrastructure code that AI assistants generate extremely well. The patterns are well-established and the implementations are straightforward. Ask your AI to "add a circuit breaker to this API client" and you will get solid boilerplate that you can customize for your needs. Focus your human attention on the thresholds and timeouts -- those depend on your specific context.
Graceful Degradation
When an external service is unavailable, your application should degrade gracefully rather than crash:
class WeatherServiceWithFallback:
"""Weather service with graceful degradation."""
def __init__(self, api_key: str) -> None:
self.primary = WeatherAPIClient(api_key)
self.circuit = CircuitBreaker(
failure_threshold=3,
recovery_timeout=60.0,
)
self._last_known: dict[str, dict] = {}
async def get_weather(self, city: str) -> dict:
"""Get weather with fallback to cached data."""
try:
result = await self.circuit.execute(
self.primary.get_current_weather, city
)
self._last_known[city] = result
return result
except (CircuitBreakerOpenError, Exception):
if city in self._last_known:
return {
**self._last_known[city],
"_stale": True,
"_message": "Using cached data",
}
return {
"error": "Weather service unavailable",
"_fallback": True,
}
20.10 Building an Integration-Heavy Application
Let us put everything together by building a service that integrates multiple external APIs into a cohesive application. We will build a simplified e-commerce notification service that:
- Processes payment webhooks from a payment processor
- Sends confirmation emails via an email service
- Stores invoice PDFs in cloud storage
- Looks up currency exchange rates for international orders
- Sends Slack notifications for high-value orders
Application Architecture
┌──────────────────────┐
Payment Webhook │ │
──────────────────>│ Notification │──── Email Service
│ Service │
Order Events │ │──── Cloud Storage
──────────────────>│ - Event Router │
│ - Processors │──── Currency API
│ - Error Handling │
│ │──── Slack API
└──────────────────────┘
Service Configuration
from dataclasses import dataclass
@dataclass
class ServiceConfig:
"""Configuration for all external service integrations."""
# Payment processor
payment_webhook_secret: str
# Email service
email_api_key: str
email_from: str = "orders@shop.com"
# Cloud storage
storage_access_key: str = ""
storage_secret_key: str = ""
storage_bucket: str = "invoices"
# Currency service
currency_api_key: str = ""
# Slack
slack_webhook_url: str = ""
slack_high_value_threshold: int = 10000 # cents
@classmethod
def from_env(cls) -> "ServiceConfig":
"""Load configuration from environment variables."""
import os
return cls(
payment_webhook_secret=os.environ[
"PAYMENT_WEBHOOK_SECRET"
],
email_api_key=os.environ["EMAIL_API_KEY"],
email_from=os.environ.get(
"EMAIL_FROM", "orders@shop.com"
),
storage_access_key=os.environ.get(
"STORAGE_ACCESS_KEY", ""
),
storage_secret_key=os.environ.get(
"STORAGE_SECRET_KEY", ""
),
storage_bucket=os.environ.get(
"STORAGE_BUCKET", "invoices"
),
currency_api_key=os.environ.get(
"CURRENCY_API_KEY", ""
),
slack_webhook_url=os.environ.get(
"SLACK_WEBHOOK_URL", ""
),
)
Event Processing Pipeline
from typing import Any
from collections.abc import Callable
import logging
logger = logging.getLogger(__name__)
class EventRouter:
"""Routes events to appropriate handlers."""
def __init__(self) -> None:
self._handlers: dict[
str, list[Callable]
] = {}
def register(
self, event_type: str, handler: Callable
) -> None:
"""Register a handler for an event type."""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
async def dispatch(
self, event_type: str, data: dict[str, Any]
) -> list[dict[str, Any]]:
"""Dispatch an event to all registered handlers."""
handlers = self._handlers.get(event_type, [])
results = []
for handler in handlers:
try:
result = await handler(data)
results.append(
{"handler": handler.__name__, "status": "success",
"result": result}
)
except Exception as exc:
logger.error(
f"Handler {handler.__name__} failed: {exc}"
)
results.append(
{"handler": handler.__name__, "status": "error",
"error": str(exc)}
)
return results
Wiring It All Together
from fastapi import FastAPI, Request, HTTPException
app = FastAPI(title="E-Commerce Notification Service")
config = ServiceConfig.from_env()
router = EventRouter()
# Initialize services
email_service = EmailService(config.email_api_key)
currency_service = CurrencyService(config.currency_api_key)
async def send_order_confirmation(data: dict) -> dict:
"""Send order confirmation email."""
customer_email = data["customer"]["email"]
order_id = data["order_id"]
amount = data["amount"] / 100 # cents to dollars
currency = data["currency"].upper()
await email_service.send(
EmailMessage(
to=[customer_email],
subject=f"Order Confirmed: #{order_id}",
html_body=f"""
<h1>Thank you for your order!</h1>
<p>Order #{order_id} for {currency} {amount:.2f}
has been confirmed.</p>
""",
from_email=config.email_from,
)
)
return {"email_sent_to": customer_email}
async def notify_slack_high_value(data: dict) -> dict:
"""Notify Slack channel for high-value orders."""
if data["amount"] < config.slack_high_value_threshold:
return {"skipped": True, "reason": "Below threshold"}
amount = data["amount"] / 100
currency = data["currency"].upper()
async with httpx.AsyncClient() as client:
await client.post(
config.slack_webhook_url,
json={
"text": (
f"High-value order #{data['order_id']}: "
f"{currency} {amount:.2f} from "
f"{data['customer']['email']}"
),
},
)
return {"slack_notified": True}
async def convert_and_log_currency(data: dict) -> dict:
"""Convert order amount to USD for reporting."""
if data["currency"] == "usd":
return {"usd_amount": data["amount"]}
usd_amount = await currency_service.convert(
data["amount"] / 100,
data["currency"],
"usd",
)
return {"usd_amount": int(usd_amount * 100)}
# Register handlers
router.register("payment.succeeded", send_order_confirmation)
router.register("payment.succeeded", notify_slack_high_value)
router.register("payment.succeeded", convert_and_log_currency)
@app.post("/webhooks/payments")
async def payment_webhook(request: Request):
"""Handle payment processor webhooks."""
payload = await request.body()
signature = request.headers.get("X-Webhook-Signature", "")
if not verify_webhook_signature(
payload, signature, config.payment_webhook_secret
):
raise HTTPException(
status_code=401, detail="Invalid signature"
)
event = await request.json()
event_type = event.get("type", "")
event_data = event.get("data", {})
results = await router.dispatch(event_type, event_data)
return {"status": "processed", "results": results}
Error Handling in Multi-Service Operations
When multiple services are involved, partial failures are inevitable. Handle them with the "best effort" pattern:
async def process_order_complete(order_data: dict) -> dict:
"""Process a completed order with multiple integrations.
Uses best-effort processing -- individual service failures
do not block other operations.
"""
results = {
"email": {"status": "pending"},
"storage": {"status": "pending"},
"slack": {"status": "pending"},
}
# Run all integrations concurrently
tasks = {
"email": send_order_confirmation(order_data),
"storage": store_invoice(order_data),
"slack": notify_slack_high_value(order_data),
}
completed = await asyncio.gather(
*tasks.values(), return_exceptions=True
)
for key, result in zip(tasks.keys(), completed):
if isinstance(result, Exception):
results[key] = {
"status": "failed",
"error": str(result),
}
logger.error(f"{key} integration failed: {result}")
else:
results[key] = {"status": "success", "data": result}
return results
This pattern ensures that a failure in one service (e.g., Slack is down) does not prevent other operations (email confirmation, invoice storage) from completing.
Architecture Note: For truly mission-critical operations (like payment confirmation emails), consider using a persistent queue (Redis, RabbitMQ, SQS) instead of processing inline. This guarantees eventual delivery even if your application restarts between the webhook receipt and the email send.
Testing Integrations
Testing code that depends on external APIs requires special techniques:
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_send_order_confirmation():
"""Test order confirmation email is sent correctly."""
mock_email = AsyncMock()
with patch(
"app.services.email_service", mock_email
):
order_data = {
"order_id": "ORD-123",
"amount": 5000,
"currency": "usd",
"customer": {"email": "test@example.com"},
}
result = await send_order_confirmation(order_data)
mock_email.send.assert_called_once()
call_args = mock_email.send.call_args[0][0]
assert "test@example.com" in call_args.to
assert "ORD-123" in call_args.subject
@pytest.mark.asyncio
async def test_webhook_signature_verification():
"""Test that invalid webhook signatures are rejected."""
assert not verify_webhook_signature(
b"payload",
"invalid_signature",
"secret",
)
For integration tests that hit real APIs, use the external service's test/sandbox mode:
@pytest.mark.integration
@pytest.mark.asyncio
async def test_payment_flow_sandbox():
"""Test payment flow against sandbox environment."""
service = PaymentService(
api_key="sk_test_sandbox_key",
base_url="https://api.paymentprocessor.com/test",
)
result = await service.create_payment_intent(
amount=1000,
currency="usd",
customer_id="cus_test_123",
)
assert result.status == PaymentStatus.PENDING
await service.close()
AI Prompt Tip: Testing integration code is an excellent use case for AI assistants. Try: "Generate pytest fixtures and test cases for this PaymentService class. Mock all external HTTP calls. Test both success and error paths including timeouts, 429 rate limits, and 500 server errors."
Summary
External API integration is a fundamental skill for modern software development. In this chapter, we covered the full lifecycle of working with external services:
-
The Integration Landscape -- Understanding the categories of external services and common integration patterns (direct calls, SDKs, abstraction layers, webhooks, message queues).
-
RESTful API Consumption -- Building robust HTTP clients with
httpx, handling async operations, setting timeouts, validating responses, and logging. -
OAuth and API Authentication -- Implementing OAuth 2.0 flows (authorization code, client credentials), managing tokens, and handling token refresh.
-
Payment Processing -- Using tokenization, idempotency keys, and webhooks for reliable payment integration.
-
Email and Notification Services -- Building multi-channel notification systems with the Strategy Pattern.
-
Cloud Storage -- Uploading, downloading, and managing files with pre-signed URLs.
-
Third-Party Data Services -- Consuming data APIs with caching for efficiency.
-
Webhook Handling -- Receiving, verifying, and processing webhook events idempotently.
-
Rate Limiting and Error Handling -- Implementing rate limiters, exponential backoff retry, circuit breakers, and graceful degradation.
-
Integration-Heavy Applications -- Combining multiple services with event routing, concurrent processing, and best-effort error handling.
Throughout this chapter, we have seen how AI coding assistants accelerate integration work by generating boilerplate, navigating documentation, and applying established patterns. The key is understanding why these patterns exist so you can evaluate and customize the generated code for your specific requirements.
In Chapter 21, we will explore how AI assists with testing -- including the integration tests that are crucial for verifying the external service interactions we built here.
Key Terms
- API Key: A secret string used to authenticate API requests, typically passed in a header or query parameter.
- Circuit Breaker: A design pattern that detects failures and prevents an application from repeatedly trying an operation that is likely to fail.
- Client Credentials Grant: An OAuth 2.0 flow for server-to-server authentication without user involvement.
- Exponential Backoff: A retry strategy where the delay between retries increases exponentially with each attempt.
- Idempotency Key: A unique identifier sent with a request to ensure the operation is only performed once, even if the request is retried.
- Jitter: Random variation added to retry delays to prevent synchronized retries from multiple clients.
- OAuth 2.0: An industry-standard protocol for authorization that enables applications to access user resources on other services.
- Pre-signed URL: A time-limited URL that grants temporary access to a private cloud storage object.
- Rate Limiting: Restrictions imposed by an API on the number of requests a client can make in a given time period.
- Token Bucket: A rate-limiting algorithm that allows a burst of requests up to a maximum, then throttles to a steady rate.
- Tokenization: In payments, the process of replacing sensitive card data with a non-sensitive token.
- Webhook: An HTTP callback where an external service sends data to your application when an event occurs, inverting the typical request-response pattern.