TL;DR
- A production AI microservice fastapi setup needs more than a working endpoint: it needs API-key auth, Pydantic validation, structured logging, caching, streaming, and a clean Dockerfile before any real team can use it.
- Wrapping Claude with FastAPI gives you typed request/response contracts, automatic OpenAPI docs, and async support that matters when token generation takes 2 to 10 seconds per request.
- Prompt caching cuts repeated-context costs by up to 90%. A simple in-process TTL cache on top of that eliminates redundant Anthropic API calls for identical inputs within a time window.
- Streaming via Server-Sent Events (SSE) keeps the client connected during long generations without holding HTTP connections open indefinitely.
- Structured JSON logging with request IDs lets you correlate every Anthropic API call back to the originating HTTP request, which is the minimum you need for production debugging.
- The full project in this article is under 500 lines across six files and runs locally with a single
docker compose up.
Why a Microservice Wrapper Makes Sense
By the time you reach the end of a 30-part series on running AI in production, you have seen Claude do code review, log triage, contract analysis, ticket routing, SQL generation, and a dozen other tasks. Each of those articles showed a focused Python script or notebook. That is fine for a proof of concept, but no engineering team ships a script to production. They ship a service.
A production AI microservice fastapi setup solves several problems that accumulate as usage grows. First, your callers need a stable contract. If you change your prompt template, they should not have to change their code. A typed REST API with Pydantic schemas gives you that contract. Second, you need auth. Every call to Anthropic costs money, and you cannot let anonymous callers rack up your bill. Third, you need observability. When a request returns a bad answer, you need to know what prompt went in, which model was called, how many tokens were used, and what the latency was. A raw script gives you none of that.
FastAPI is a strong fit here. It is async-native, generates OpenAPI documentation automatically, integrates cleanly with Pydantic v2, and its dependency injection system makes adding auth middleware a matter of a few lines. The Anthropic Python SDK is also async-compatible, so you get proper concurrency without threading gymnastics.
This article builds the whole thing from scratch: project layout, every source file, a Dockerfile, a docker-compose for local development, and a curl-based smoke test. It is the capstone of this series, pulling together patterns from earlier articles: streaming (Part 26), prompt caching (Part 4), observability (Part 28), and guardrails (Part 25).
Project Layout and Architecture
Before writing any code, agree on a structure that a new engineer can navigate without a tour. This service follows a standard FastAPI layout with a few additions for AI-specific concerns.
claude-microservice/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app factory, lifespan, middleware
│ ├── config.py # Settings via pydantic-settings
│ ├── auth.py # API-key dependency
│ ├── cache.py # In-process TTL cache
│ ├── claude.py # Anthropic client wrapper
│ ├── models.py # Pydantic request/response schemas
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── completions.py # POST /v1/completions
│ │ └── stream.py # POST /v1/completions/stream (SSE)
│ └── logging_config.py # structlog setup
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ └── test_completions.py
├── .env.example
├── Dockerfile
├── docker-compose.yml
└── requirements.txt
All Dependencies and Configuration
requirements.txt
fastapi==0.111.0
uvicorn[standard]==0.29.0
anthropic==0.25.1
pydantic==2.7.1
pydantic-settings==2.2.1
structlog==24.1.0
python-dotenv==1.0.1
httpx==0.27.0
pytest==8.2.0
pytest-asyncio==0.23.6
httpx==0.27.0
pip install fastapi uvicorn[standard] anthropic pydantic pydantic-settings structlog python-dotenv httpx.env.example
# Copy to .env and fill in real values. Never commit .env.
ANTHROPIC_API_KEY=sk-ant-...
SERVICE_API_KEY=your-chosen-service-api-key
DEFAULT_MODEL=claude-sonnet-4-6
DEFAULT_MAX_TOKENS=1024
CACHE_TTL_SECONDS=300
LOG_LEVEL=INFO
ENVIRONMENT=development
The SERVICE_API_KEY is what your downstream callers pass in the X-API-Key header. It is separate from your Anthropic key. In a real team you would have multiple keys stored in a secrets manager and checked against a database, but a single env var is sufficient for this article without obscuring the important parts.
app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Literal
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
anthropic_api_key: str
service_api_key: str
default_model: str = "claude-sonnet-4-6"
default_max_tokens: int = 1024
cache_ttl_seconds: int = 300
log_level: str = "INFO"
environment: Literal["development", "staging", "production"] = "development"
# Singleton: import this wherever you need settings
settings = Settings()
Pydantic-settings reads from environment variables and from a .env file. If ANTHROPIC_API_KEY is missing, the service refuses to start with a clear validation error. No silent failures.
Auth, Caching, and Logging Infrastructure
app/auth.py
from fastapi import HTTPException, Security, status
from fastapi.security.api_key import APIKeyHeader
from app.config import settings
API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False)
async def require_api_key(api_key: str = Security(API_KEY_HEADER)) -> str:
"""FastAPI dependency that enforces API-key authentication."""
if not api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="X-API-Key header missing",
)
if api_key != settings.service_api_key:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid API key",
)
return api_key
app/cache.py
import hashlib
import json
import time
from typing import Any
from app.config import settings
class TTLCache:
"""
Simple in-process TTL cache keyed on a hash of the request payload.
Not suitable for multi-replica deployments without replacement by Redis.
For a single-replica service or dev/staging this works without extra infra.
"""
def __init__(self, ttl_seconds: int = 300):
self._store: dict[str, tuple[Any, float]] = {}
self._ttl = ttl_seconds
@staticmethod
def make_key(obj: dict) -> str:
serialized = json.dumps(obj, sort_keys=True, ensure_ascii=True)
return hashlib.sha256(serialized.encode()).hexdigest()
def get(self, key: str) -> Any | None:
entry = self._store.get(key)
if entry is None:
return None
value, expires_at = entry
if time.monotonic() > expires_at:
del self._store[key]
return None
return value
def set(self, key: str, value: Any) -> None:
self._store[key] = (value, time.monotonic() + self._ttl)
def invalidate(self, key: str) -> None:
self._store.pop(key, None)
def size(self) -> int:
return len(self._store)
# Module-level singleton used by routers
cache = TTLCache(ttl_seconds=settings.cache_ttl_seconds)
app/logging_config.py
import logging
import sys
import structlog
def configure_logging(level: str = "INFO") -> None:
"""
Set up structlog to emit structured JSON lines.
Every log event gets a timestamp and level automatically.
"""
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=getattr(logging, level.upper(), logging.INFO),
)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, level.upper(), logging.INFO)
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
def get_logger(name: str) -> structlog.BoundLogger:
return structlog.get_logger(name)
request_id to structlog’s context variables at the start of each request means every log line emitted anywhere in that request’s call stack automatically carries the same ID. You do not pass a logger object around; the context propagates through the async context automatically. This is the same pattern used in Part 28’s observability article, but applied directly inside the FastAPI request lifecycle.Pydantic Models
app/models.py
from __future__ import annotations
from typing import Annotated, Literal
from pydantic import BaseModel, Field, field_validator
class Message(BaseModel):
role: Literal["user", "assistant"]
content: str = Field(..., min_length=1, max_length=32_000)
class CompletionRequest(BaseModel):
messages: list[Message] = Field(..., min_length=1, max_length=50)
system: str | None = Field(
default=None,
max_length=100_000,
description="Optional system prompt. Supports prompt caching for long contexts.",
)
model: str | None = Field(
default=None,
description="Override the default model. Must be a valid Anthropic model id.",
)
max_tokens: Annotated[int, Field(ge=1, le=8096)] = 1024
temperature: Annotated[float, Field(ge=0.0, le=1.0)] = 0.7
cache_system: bool = Field(
default=False,
description="When True, the system prompt is wrapped with cache_control for prompt caching.",
)
@field_validator("messages")
@classmethod
def last_message_must_be_user(cls, v: list[Message]) -> list[Message]:
if v and v[-1].role != "user":
raise ValueError("The last message must have role 'user'.")
return v
class UsageInfo(BaseModel):
input_tokens: int
output_tokens: int
cache_creation_input_tokens: int = 0
cache_read_input_tokens: int = 0
class CompletionResponse(BaseModel):
request_id: str
model: str
content: str
usage: UsageInfo
cached: bool = False
latency_ms: float
The Claude Wrapper
app/claude.py
This is where the Anthropic SDK lives. The wrapper handles three things: building the API call correctly, applying prompt caching when requested, and converting SDK exceptions into HTTP errors that FastAPI can serialize.
import time
from typing import AsyncIterator
import anthropic
from app.config import settings
from app.logging_config import get_logger
from app.models import CompletionRequest, CompletionResponse, UsageInfo
log = get_logger(__name__)
# One client instance shared across all requests.
# The SDK manages its own connection pool internally.
_client = anthropic.Anthropic(api_key=settings.anthropic_api_key)
def _build_system_block(request: CompletionRequest) -> str | list | None:
"""
Return the system prompt in the right shape for the SDK call.
If cache_system is True, wrap the text in a cache_control block.
This tells the Anthropic API to cache the system prompt tokens
so that subsequent calls with the same system text pay only
cache_read_input_tokens instead of full input_tokens.
"""
if not request.system:
return None
if not request.cache_system:
return request.system
return [
{
"type": "text",
"text": request.system,
"cache_control": {"type": "ephemeral"},
}
]
def _messages_payload(request: CompletionRequest) -> list[dict]:
return [{"role": m.role, "content": m.content} for m in request.messages]
async def complete(
request: CompletionRequest,
request_id: str,
) -> CompletionResponse:
"""
Non-streaming completion. Returns a CompletionResponse with full content,
token usage, and latency.
"""
model = request.model or settings.default_model
system = _build_system_block(request)
messages = _messages_payload(request)
log.info(
"anthropic_request",
request_id=request_id,
model=model,
message_count=len(messages),
cache_system=request.cache_system,
)
start = time.perf_counter()
try:
kwargs: dict = dict(
model=model,
max_tokens=request.max_tokens,
messages=messages,
)
if system is not None:
kwargs["system"] = system
msg = _client.messages.create(**kwargs)
except anthropic.AuthenticationError as exc:
log.error("anthropic_auth_error", request_id=request_id, detail=str(exc))
raise # re-raise; main.py global handler converts this to 500
except anthropic.RateLimitError as exc:
log.warning("anthropic_rate_limit", request_id=request_id, detail=str(exc))
raise
except anthropic.APIError as exc:
log.error("anthropic_api_error", request_id=request_id, detail=str(exc))
raise
latency_ms = (time.perf_counter() - start) * 1000
text = msg.content[0].text
usage = UsageInfo(
input_tokens=msg.usage.input_tokens,
output_tokens=msg.usage.output_tokens,
cache_creation_input_tokens=getattr(
msg.usage, "cache_creation_input_tokens", 0
),
cache_read_input_tokens=getattr(msg.usage, "cache_read_input_tokens", 0),
)
log.info(
"anthropic_response",
request_id=request_id,
model=model,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
cache_read=usage.cache_read_input_tokens,
latency_ms=round(latency_ms, 1),
)
return CompletionResponse(
request_id=request_id,
model=msg.model,
content=text,
usage=usage,
cached=False,
latency_ms=round(latency_ms, 1),
)
def stream_complete(
request: CompletionRequest,
request_id: str,
) -> AsyncIterator[str]:
"""
Yields text chunks from the Anthropic streaming API.
Caller is responsible for wrapping in SSE formatting.
"""
model = request.model or settings.default_model
system = _build_system_block(request)
messages = _messages_payload(request)
log.info(
"anthropic_stream_start",
request_id=request_id,
model=model,
)
async def _generate() -> AsyncIterator[str]:
kwargs: dict = dict(
model=model,
max_tokens=request.max_tokens,
messages=messages,
)
if system is not None:
kwargs["system"] = system
try:
with _client.messages.stream(**kwargs) as stream:
for text in stream.text_stream:
yield text
except anthropic.APIError as exc:
log.error("anthropic_stream_error", request_id=request_id, detail=str(exc))
raise
return _generate()
Routers: Completions and Streaming
app/routers/completions.py
import uuid
import structlog
from fastapi import APIRouter, Depends, HTTPException, status
import anthropic as anthropic_sdk
from app.auth import require_api_key
from app.cache import cache
from app.claude import complete
from app.logging_config import get_logger
from app.models import CompletionRequest, CompletionResponse
router = APIRouter(prefix="/v1", tags=["completions"])
log = get_logger(__name__)
@router.post(
"/completions",
response_model=CompletionResponse,
summary="Non-streaming completion",
description="Send a list of messages and receive a single completed response. "
"Identical requests within the cache TTL window return cached results.",
)
async def create_completion(
body: CompletionRequest,
_key: str = Depends(require_api_key),
) -> CompletionResponse:
request_id = str(uuid.uuid4())
# Bind request_id into structlog context for this async task
structlog.contextvars.bind_contextvars(request_id=request_id)
# Build a cache key from the full request payload
cache_key = cache.make_key(body.model_dump())
cached_result = cache.get(cache_key)
if cached_result is not None:
log.info("cache_hit", request_id=request_id)
cached_result.cached = True
cached_result.request_id = request_id
return cached_result
log.info("cache_miss", request_id=request_id)
try:
response = await complete(body, request_id)
except anthropic_sdk.RateLimitError:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Anthropic rate limit reached. Retry after a moment.",
)
except anthropic_sdk.AuthenticationError:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Service misconfiguration: invalid Anthropic credentials.",
)
except anthropic_sdk.APIError as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Upstream AI error: {exc}",
)
cache.set(cache_key, response)
return response
app/routers/stream.py
import json
import uuid
import structlog
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from app.auth import require_api_key
from app.claude import stream_complete
from app.logging_config import get_logger
from app.models import CompletionRequest
router = APIRouter(prefix="/v1", tags=["streaming"])
log = get_logger(__name__)
def _sse_line(data: str) -> str:
"""Format a single chunk as a Server-Sent Events line."""
return f"data: {json.dumps({'text': data})}\n\n"
@router.post(
"/completions/stream",
summary="Streaming completion via SSE",
description="Streams the model response as Server-Sent Events. "
"Each event carries a 'text' field with the next token chunk. "
"The stream ends with 'data: [DONE]'.",
)
async def create_streaming_completion(
body: CompletionRequest,
_key: str = Depends(require_api_key),
) -> StreamingResponse:
request_id = str(uuid.uuid4())
structlog.contextvars.bind_contextvars(request_id=request_id)
log.info("stream_request", request_id=request_id)
generator = stream_complete(body, request_id)
async def event_stream():
async for chunk in generator:
yield _sse_line(chunk)
yield "data: [DONE]\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable nginx buffering
"X-Request-Id": request_id,
},
)
The FastAPI App Factory
app/main.py
import time
import uuid
from contextlib import asynccontextmanager
import structlog
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from app.config import settings
from app.logging_config import configure_logging, get_logger
from app.routers import completions, stream
# Configure logging once at startup, before any other imports emit logs
configure_logging(settings.log_level)
log = get_logger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Run once at startup; yield; run once at shutdown."""
log.info(
"service_startup",
environment=settings.environment,
default_model=settings.default_model,
cache_ttl=settings.cache_ttl_seconds,
)
yield
log.info("service_shutdown")
app = FastAPI(
title="Claude AI Microservice",
version="1.0.0",
description="Production FastAPI wrapper for the Anthropic Claude API.",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc",
)
# CORS: lock this down to your actual frontend domains in production
app.add_middleware(
CORSMiddleware,
allow_origins=["*"] if settings.environment == "development" else [],
allow_methods=["POST", "GET"],
allow_headers=["X-API-Key", "Content-Type"],
)
@app.middleware("http")
async def request_logging_middleware(request: Request, call_next) -> Response:
"""
Assign a request ID to every incoming request.
Log the method, path, and response status + latency on the way out.
"""
request_id = request.headers.get("X-Request-Id") or str(uuid.uuid4())
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=request_id,
method=request.method,
path=request.url.path,
)
start = time.perf_counter()
response: Response = await call_next(request)
latency_ms = round((time.perf_counter() - start) * 1000, 1)
log.info(
"http_request",
status=response.status_code,
latency_ms=latency_ms,
)
response.headers["X-Request-Id"] = request_id
return response
@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse:
log.exception("unhandled_exception", exc_info=exc)
return JSONResponse(
status_code=500,
content={"detail": "An internal error occurred. Check logs for request_id."},
)
# Health check, no auth required
@app.get("/health", tags=["ops"])
async def health() -> dict:
return {"status": "ok", "service": "claude-microservice", "version": "1.0.0"}
# Mount routers
app.include_router(completions.router)
app.include_router(stream.router)
Dockerfile and Docker Compose
Dockerfile
FROM python:3.12-slim
# Security: run as non-root
RUN useradd --create-home --shell /bin/bash appuser
WORKDIR /app
# Install deps in a separate layer so rebuilds on code changes are fast
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY app/ ./app/
USER appuser
EXPOSE 8000
# --workers 1 for a single-replica pod; scale replicas at the orchestrator level
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
docker-compose.yml
services:
api:
build: .
ports:
- "8000:8000"
env_file:
- .env
volumes:
# Hot-reload in development: mount source over the image copy
- ./app:/app/app
command: >
uvicorn app.main:app
--host 0.0.0.0
--port 8000
--reload
restart: unless-stopped
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 15s
timeout: 5s
retries: 3
Tests
tests/conftest.py
import os
import pytest
from fastapi.testclient import TestClient
# Set env vars before the app is imported so pydantic-settings picks them up
os.environ.setdefault("ANTHROPIC_API_KEY", "test-key-not-real")
os.environ.setdefault("SERVICE_API_KEY", "test-service-key")
os.environ.setdefault("DEFAULT_MODEL", "claude-haiku-4-5")
from app.main import app # noqa: E402 (must come after env setup)
@pytest.fixture(scope="session")
def client():
with TestClient(app) as c:
yield c
@pytest.fixture
def auth_headers():
return {"X-API-Key": "test-service-key"}
tests/test_completions.py
from unittest.mock import MagicMock, patch
import pytest
def _mock_response(text: str = "Hello from mock Claude"):
"""Build a mock that looks like an anthropic.types.Message."""
mock = MagicMock()
mock.model = "claude-haiku-4-5"
mock.content = [MagicMock(text=text)]
mock.usage.input_tokens = 20
mock.usage.output_tokens = 10
mock.usage.cache_creation_input_tokens = 0
mock.usage.cache_read_input_tokens = 0
return mock
class TestHealth:
def test_health_ok(self, client):
resp = client.get("/health")
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
class TestAuth:
def test_missing_key_returns_401(self, client):
resp = client.post("/v1/completions", json={
"messages": [{"role": "user", "content": "hi"}]
})
assert resp.status_code == 401
def test_wrong_key_returns_403(self, client):
resp = client.post(
"/v1/completions",
json={"messages": [{"role": "user", "content": "hi"}]},
headers={"X-API-Key": "wrong-key"},
)
assert resp.status_code == 403
class TestCompletions:
@patch("app.claude._client")
def test_completion_returns_content(self, mock_client, client, auth_headers):
mock_client.messages.create.return_value = _mock_response("Mocked answer")
resp = client.post(
"/v1/completions",
json={"messages": [{"role": "user", "content": "What is 2+2?"}]},
headers=auth_headers,
)
assert resp.status_code == 200
data = resp.json()
assert data["content"] == "Mocked answer"
assert data["usage"]["input_tokens"] == 20
assert data["cached"] is False
@patch("app.claude._client")
def test_cache_hit_on_second_call(self, mock_client, client, auth_headers):
mock_client.messages.create.return_value = _mock_response("Cached response")
payload = {"messages": [{"role": "user", "content": "Unique cache test 12345"}]}
# First call: miss
r1 = client.post("/v1/completions", json=payload, headers=auth_headers)
assert r1.json()["cached"] is False
# Second call: hit (Anthropic client should NOT be called again)
r2 = client.post("/v1/completions", json=payload, headers=auth_headers)
assert r2.json()["cached"] is True
assert mock_client.messages.create.call_count == 1
def test_validation_rejects_empty_messages(self, client, auth_headers):
resp = client.post(
"/v1/completions",
json={"messages": []},
headers=auth_headers,
)
assert resp.status_code == 422
def test_validation_rejects_assistant_last(self, client, auth_headers):
resp = client.post(
"/v1/completions",
json={
"messages": [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
]
},
headers=auth_headers,
)
assert resp.status_code == 422
Running It End-to-End
Start the service
cp .env.example .env # fill in ANTHROPIC_API_KEY and SERVICE_API_KEY
docker compose up --buildSmoke test with curl
# Non-streaming completion
curl -s -X POST http://localhost:8000/v1/completions \
-H "Content-Type: application/json" \
-H "X-API-Key: your-chosen-service-api-key" \
-d '{
"messages": [
{"role": "user", "content": "Explain what a Pydantic validator does in two sentences."}
],
"system": "You are a concise Python educator.",
"max_tokens": 200,
"cache_system": true
}' | python -m json.tool
Sample response:
{
"request_id": "3f8a2b1c-9d4e-4f7a-a123-0b5c6d7e8f90",
"model": "claude-sonnet-4-6",
"content": "A Pydantic validator is a function decorated with @field_validator that runs when a model field is assigned, letting you enforce custom constraints or transform values before they are stored. If the validation fails, Pydantic raises a ValidationError with a structured error message pointing to the exact field.",
"usage": {
"input_tokens": 42,
"output_tokens": 61,
"cache_creation_input_tokens": 18,
"cache_read_input_tokens": 0
},
"cached": false,
"latency_ms": 1284.7
}
Streaming with curl
curl -N -s -X POST http://localhost:8000/v1/completions/stream \
-H "Content-Type: application/json" \
-H "X-API-Key: your-chosen-service-api-key" \
-d '{
"messages": [{"role": "user", "content": "Count slowly from 1 to 5."}],
"max_tokens": 80
}'
Sample SSE output (one chunk per line):
data: {"text": "1"}
data: {"text": "..."}
data: {"text": " 2"}
data: {"text": "..."}
data: {"text": " 3"}
data: {"text": " 4"}
data: {"text": " 5"}
data: [DONE]
Run tests
pytest tests/ -vCommon Pitfalls
- Sharing a synchronous Anthropic client across async workers: The
anthropic.Anthropic()client is synchronous. Calling it inside an async FastAPI endpoint blocks the event loop. For high-throughput services, switch toanthropic.AsyncAnthropic()andawait client.messages.create(...). The sync client used here is fine for low-to-medium traffic on a single uvicorn worker; it works because FastAPI runs sync code in a thread pool executor automatically only if you declare the route handler non-async. Since this service usesasync defroutes and calls a sync SDK inside, the correct production fix is to move to AsyncAnthropic. The sync version is kept here to keep the code readable. - In-process cache breaks with multiple replicas: The TTLCache in this article is a Python dict. It does not survive process restarts and is not shared across pods. Replace it with Redis (use
redis-pyasync client) before deploying more than one replica. - Not setting max_tokens: Claude does not have a sensible default for max_tokens. If you omit it, the SDK raises a validation error. Set a conservative default in config and let callers override it with a reasonable ceiling.
- Returning raw Anthropic errors to callers: SDK exception messages sometimes include your model name or account context. Always catch at the router level and return a sanitized HTTP error, as shown in completions.py.
- Forgetting X-Accel-Buffering: no on streaming: If your service sits behind nginx (common on Bluehost, on VPS setups, and in many Kubernetes ingress configs), nginx buffers SSE by default. Without
X-Accel-Buffering: noin the streaming response headers, clients see one big flush at the end instead of incremental tokens. The stream.py router sets this header already. - Caching streaming responses: The streaming endpoint in this article does not cache. Caching a streaming response would require buffering the whole response first, which defeats the purpose. If you need caching and streaming, cache the full response on first call and return it non-streamed on cache hit, or stream from cache by replaying the buffered text in chunks.
- CORS wildcard in production: The app factory sets
allow_origins=["*"]in development. The production branch returns an empty list, which rejects all cross-origin requests. Fill in your actual frontend domains before deploying.
Cost and Latency Reference
| Model | Input cost (per 1M tokens) | Output cost (per 1M tokens) | Typical p50 latency (first token) | Best for |
|---|---|---|---|---|
| claude-haiku-4-5 | $0.80 | $4.00 | ~200ms | Classification, routing, high-volume short tasks |
| claude-sonnet-4-6 | $3.00 | $15.00 | ~400ms | Most production tasks: summarization, analysis, code |
| claude-opus-4-8 | $15.00 | $75.00 | ~700ms | Complex reasoning, ambiguous judgment calls |
With prompt caching enabled (cache_system: true in the request body), a 10,000-token system prompt that would cost $0.03 per call on Sonnet costs $0.0003 on a cache hit. If your service has a fixed system prompt that does not change between calls, enabling caching is the single highest-impact cost optimization available. See Part 4: Prompt Caching with Claude for the full analysis.
| Scenario | Recommended model | Caching? | Expected cost per 1K calls (est.) |
|---|---|---|---|
| Short Q&A, 100-token input | claude-haiku-4-5 | No | ~$0.08 |
| Document analysis, 4K-token system | claude-sonnet-4-6 | Yes | ~$0.60 (vs ~$12 without cache) |
| Multi-step reasoning, 8K+ context | claude-opus-4-8 | Yes | ~$3.00 (vs ~$75 without cache) |
| Classification pipeline, 50-token input | claude-haiku-4-5 | No | ~$0.04 |
For model routing decisions at a finer grain, see Part 27: Cut AI Costs: Model Routing and Batching with Claude. For tracing and monitoring the latency numbers in your own service, see Part 28: Observability for LLM Apps.
What to Add Before Going to Production
The service as written covers the foundations. Here is what a production hardening pass would add, roughly in priority order:
- AsyncAnthropic client: Replace
anthropic.Anthropic()withanthropic.AsyncAnthropic()and updateclaude.pyto useawait. This removes the thread pool overhead for every Anthropic call. - Redis-backed cache: Drop in
redis.asyncioand replace the TTLCache class with a Redis get/set wrapper using the same SHA-256 key scheme. - Rate limiting per caller: Use
slowapi(a FastAPI-compatible wrapper aroundlimits) to cap requests per API key per minute. Without this, a single misbehaving caller can consume your entire Anthropic quota. - Multiple API keys with a database: Store keys in a Postgres table with a
last_used_atcolumn and per-key rate limit config. Therequire_api_keydependency becomes a database lookup. - OpenTelemetry spans: Add the OTEL Python SDK and emit a span per Anthropic call with token counts and latency as attributes. Ship to Grafana Tempo or Honeycomb. The approach is covered in detail in Part 28.
- Prompt injection defense: If the service accepts arbitrary user input that gets concatenated into the prompt, add the input sanitization layer described in Part 25: Guardrails and Prompt Injection Defense.
- Structured output endpoint: Add a
POST /v1/structuredroute that accepts a JSON Schema in the request body, builds a tool definition from it, passestool_choice={"type":"tool","name":"output"}to Claude, and returns the parsed object. See Part 3: Structured Output from Claude for the pattern.
Frequently Asked Questions
Can I deploy this on a shared host like Bluehost?
Not easily. Shared hosts typically do not allow you to run persistent Python processes on port 8000. This service is designed for a VPS, a container platform (Fly.io, Railway, Render, AWS ECS, GCP Cloud Run), or a Kubernetes cluster. Cloud Run is a particularly good fit: it scales to zero when idle, so you pay nothing while the service is not being called, and it handles HTTPS and container orchestration for you.
Why use FastAPI instead of Flask or Django REST Framework?
FastAPI generates an OpenAPI schema from your Pydantic models automatically, which means your callers always have up-to-date docs at /docs. Its dependency injection system (the Depends() mechanism) makes auth, database connections, and shared clients clean to wire up. It is also async-native, which matters here because Anthropic API calls take 1 to 10 seconds and you want to handle concurrent requests without spawning threads. Flask can be made async, but it is not the default. Django REST is heavier than needed for a focused microservice.
How do I handle long-running requests that exceed my reverse proxy’s timeout?
Set your nginx or load balancer’s proxy_read_timeout to at least 120 seconds for the completions endpoint. For requests you expect to take even longer, use the streaming endpoint instead. SSE keeps the connection alive with periodic data events, so the proxy sees activity and does not time out. The X-Accel-Buffering: no header in stream.py is required to prevent nginx from buffering the stream before forwarding it.
Is the in-process cache safe for concurrent requests?
The Python GIL serializes dict access in CPython, so the TTLCache will not corrupt data under concurrent async requests. The race condition to watch for is two concurrent requests with the same cache key both missing the cache, both calling Anthropic, and both writing the result. The second write overwrites the first with an identical value, which is harmless. If you need exactly-once Anthropic calls (for cost accounting), use Redis with a SET NX PX (set if not exists with TTL) pattern to acquire a lock before calling Anthropic.
How do I use tool use or function calling through this service?
Add a tools field to CompletionRequest as a list of tool definitions (following the Anthropic tool schema), pass them through to the _client.messages.create call, and add logic to handle msg.stop_reason == "tool_use". The full pattern is covered in Part 2: Tool Use with Claude. For a microservice, you would typically run the tool execution server-side and continue the conversation loop internally, returning the final text to the caller.
What is the right max_tokens ceiling to enforce in the Pydantic validator?
Claude Sonnet 4.6 and Haiku 4.5 support up to 8096 output tokens per call; Opus 4.8 supports up to 4096 in some configurations. Setting the ceiling at 8096 in the validator (as this article does) is a safe default. If your use case only ever needs short answers, lower the ceiling to 512 or 1024 to prevent runaway generation from a misconfigured caller.
How do I add authentication for multiple tenants with different rate limits?
Create a tenants table in Postgres with columns for api_key_hash (store the SHA-256 of the key, not the key itself), rpm_limit, and monthly_token_budget. Change require_api_key to hash the incoming key and look it up. Inject the tenant object into the route handler via Depends. Track token usage in a separate usage_events table. This is a few hundred lines of additional code on top of what is already here.
View the complete series: AI in Production: 30 Real-World Use Cases with Claude.
External references used in this article:
Leave a Reply