Ship a Production AI Microservice with FastAPI and Claude

Series
AI in Production: 30 Real-World Use Cases with Claude

Part 30 of 30 · View the full series

TL;DR

  • A production AI microservice fastapi setup needs more than a working endpoint: it needs API-key auth, Pydantic validation, structured logging, caching, streaming, and a clean Dockerfile before any real team can use it.
  • Wrapping Claude with FastAPI gives you typed request/response contracts, automatic OpenAPI docs, and async support that matters when token generation takes 2 to 10 seconds per request.
  • Prompt caching cuts repeated-context costs by up to 90%. A simple in-process TTL cache on top of that eliminates redundant Anthropic API calls for identical inputs within a time window.
  • Streaming via Server-Sent Events (SSE) keeps the client connected during long generations without holding HTTP connections open indefinitely.
  • Structured JSON logging with request IDs lets you correlate every Anthropic API call back to the originating HTTP request, which is the minimum you need for production debugging.
  • The full project in this article is under 500 lines across six files and runs locally with a single docker compose up.

Why a Microservice Wrapper Makes Sense

By the time you reach the end of a 30-part series on running AI in production, you have seen Claude do code review, log triage, contract analysis, ticket routing, SQL generation, and a dozen other tasks. Each of those articles showed a focused Python script or notebook. That is fine for a proof of concept, but no engineering team ships a script to production. They ship a service.

A production AI microservice fastapi setup solves several problems that accumulate as usage grows. First, your callers need a stable contract. If you change your prompt template, they should not have to change their code. A typed REST API with Pydantic schemas gives you that contract. Second, you need auth. Every call to Anthropic costs money, and you cannot let anonymous callers rack up your bill. Third, you need observability. When a request returns a bad answer, you need to know what prompt went in, which model was called, how many tokens were used, and what the latency was. A raw script gives you none of that.

FastAPI is a strong fit here. It is async-native, generates OpenAPI documentation automatically, integrates cleanly with Pydantic v2, and its dependency injection system makes adding auth middleware a matter of a few lines. The Anthropic Python SDK is also async-compatible, so you get proper concurrency without threading gymnastics.

This article builds the whole thing from scratch: project layout, every source file, a Dockerfile, a docker-compose for local development, and a curl-based smoke test. It is the capstone of this series, pulling together patterns from earlier articles: streaming (Part 26), prompt caching (Part 4), observability (Part 28), and guardrails (Part 25).

Project Layout and Architecture

Before writing any code, agree on a structure that a new engineer can navigate without a tour. This service follows a standard FastAPI layout with a few additions for AI-specific concerns.

claude-microservice/
├── app/
│   ├── __init__.py
│   ├── main.py          # FastAPI app factory, lifespan, middleware
│   ├── config.py        # Settings via pydantic-settings
│   ├── auth.py          # API-key dependency
│   ├── cache.py         # In-process TTL cache
│   ├── claude.py        # Anthropic client wrapper
│   ├── models.py        # Pydantic request/response schemas
│   ├── routers/
│   │   ├── __init__.py
│   │   ├── completions.py   # POST /v1/completions
│   │   └── stream.py        # POST /v1/completions/stream (SSE)
│   └── logging_config.py    # structlog setup
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   └── test_completions.py
├── .env.example
├── Dockerfile
├── docker-compose.yml
└── requirements.txt

Client (curl / app)

HTTPS

FastAPI Auth check Pydantic validate Request ID log Cache lookup Rate limit

cache hit

TTL Cache (in-process)

cache miss

Claude Wrapper + prompt cache

API call

Anthropic API claude-sonnet-4-6

structlog JSON lines

emit

Figure 1: Request flow through the FastAPI microservice. Auth and validation happen first; the cache is checked before any Anthropic API call is made; structlog emits a JSON line at every stage.

All Dependencies and Configuration

requirements.txt

fastapi==0.111.0
uvicorn[standard]==0.29.0
anthropic==0.25.1
pydantic==2.7.1
pydantic-settings==2.2.1
structlog==24.1.0
python-dotenv==1.0.1
httpx==0.27.0
pytest==8.2.0
pytest-asyncio==0.23.6
httpx==0.27.0
pip install fastapi uvicorn[standard] anthropic pydantic pydantic-settings structlog python-dotenv httpx

.env.example

# Copy to .env and fill in real values. Never commit .env.
ANTHROPIC_API_KEY=sk-ant-...
SERVICE_API_KEY=your-chosen-service-api-key
DEFAULT_MODEL=claude-sonnet-4-6
DEFAULT_MAX_TOKENS=1024
CACHE_TTL_SECONDS=300
LOG_LEVEL=INFO
ENVIRONMENT=development

The SERVICE_API_KEY is what your downstream callers pass in the X-API-Key header. It is separate from your Anthropic key. In a real team you would have multiple keys stored in a secrets manager and checked against a database, but a single env var is sufficient for this article without obscuring the important parts.

app/config.py

from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Literal


class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", extra="ignore")

    anthropic_api_key: str
    service_api_key: str
    default_model: str = "claude-sonnet-4-6"
    default_max_tokens: int = 1024
    cache_ttl_seconds: int = 300
    log_level: str = "INFO"
    environment: Literal["development", "staging", "production"] = "development"


# Singleton: import this wherever you need settings
settings = Settings()

Pydantic-settings reads from environment variables and from a .env file. If ANTHROPIC_API_KEY is missing, the service refuses to start with a clear validation error. No silent failures.

Auth, Caching, and Logging Infrastructure

app/auth.py

from fastapi import HTTPException, Security, status
from fastapi.security.api_key import APIKeyHeader

from app.config import settings

API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False)


async def require_api_key(api_key: str = Security(API_KEY_HEADER)) -> str:
    """FastAPI dependency that enforces API-key authentication."""
    if not api_key:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="X-API-Key header missing",
        )
    if api_key != settings.service_api_key:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Invalid API key",
        )
    return api_key

app/cache.py

import hashlib
import json
import time
from typing import Any

from app.config import settings


class TTLCache:
    """
    Simple in-process TTL cache keyed on a hash of the request payload.

    Not suitable for multi-replica deployments without replacement by Redis.
    For a single-replica service or dev/staging this works without extra infra.
    """

    def __init__(self, ttl_seconds: int = 300):
        self._store: dict[str, tuple[Any, float]] = {}
        self._ttl = ttl_seconds

    @staticmethod
    def make_key(obj: dict) -> str:
        serialized = json.dumps(obj, sort_keys=True, ensure_ascii=True)
        return hashlib.sha256(serialized.encode()).hexdigest()

    def get(self, key: str) -> Any | None:
        entry = self._store.get(key)
        if entry is None:
            return None
        value, expires_at = entry
        if time.monotonic() > expires_at:
            del self._store[key]
            return None
        return value

    def set(self, key: str, value: Any) -> None:
        self._store[key] = (value, time.monotonic() + self._ttl)

    def invalidate(self, key: str) -> None:
        self._store.pop(key, None)

    def size(self) -> int:
        return len(self._store)


# Module-level singleton used by routers
cache = TTLCache(ttl_seconds=settings.cache_ttl_seconds)

app/logging_config.py

import logging
import sys

import structlog


def configure_logging(level: str = "INFO") -> None:
    """
    Set up structlog to emit structured JSON lines.
    Every log event gets a timestamp and level automatically.
    """
    logging.basicConfig(
        format="%(message)s",
        stream=sys.stdout,
        level=getattr(logging, level.upper(), logging.INFO),
    )

    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.stdlib.add_log_level,
            structlog.stdlib.add_logger_name,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer(),
        ],
        wrapper_class=structlog.make_filtering_bound_logger(
            getattr(logging, level.upper(), logging.INFO)
        ),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )


def get_logger(name: str) -> structlog.BoundLogger:
    return structlog.get_logger(name)
Key idea: Binding a request_id to structlog’s context variables at the start of each request means every log line emitted anywhere in that request’s call stack automatically carries the same ID. You do not pass a logger object around; the context propagates through the async context automatically. This is the same pattern used in Part 28’s observability article, but applied directly inside the FastAPI request lifecycle.

Pydantic Models

app/models.py

from __future__ import annotations

from typing import Annotated, Literal

from pydantic import BaseModel, Field, field_validator


class Message(BaseModel):
    role: Literal["user", "assistant"]
    content: str = Field(..., min_length=1, max_length=32_000)


class CompletionRequest(BaseModel):
    messages: list[Message] = Field(..., min_length=1, max_length=50)
    system: str | None = Field(
        default=None,
        max_length=100_000,
        description="Optional system prompt. Supports prompt caching for long contexts.",
    )
    model: str | None = Field(
        default=None,
        description="Override the default model. Must be a valid Anthropic model id.",
    )
    max_tokens: Annotated[int, Field(ge=1, le=8096)] = 1024
    temperature: Annotated[float, Field(ge=0.0, le=1.0)] = 0.7
    cache_system: bool = Field(
        default=False,
        description="When True, the system prompt is wrapped with cache_control for prompt caching.",
    )

    @field_validator("messages")
    @classmethod
    def last_message_must_be_user(cls, v: list[Message]) -> list[Message]:
        if v and v[-1].role != "user":
            raise ValueError("The last message must have role 'user'.")
        return v


class UsageInfo(BaseModel):
    input_tokens: int
    output_tokens: int
    cache_creation_input_tokens: int = 0
    cache_read_input_tokens: int = 0


class CompletionResponse(BaseModel):
    request_id: str
    model: str
    content: str
    usage: UsageInfo
    cached: bool = False
    latency_ms: float

The Claude Wrapper

app/claude.py

This is where the Anthropic SDK lives. The wrapper handles three things: building the API call correctly, applying prompt caching when requested, and converting SDK exceptions into HTTP errors that FastAPI can serialize.

import time
from typing import AsyncIterator

import anthropic

from app.config import settings
from app.logging_config import get_logger
from app.models import CompletionRequest, CompletionResponse, UsageInfo

log = get_logger(__name__)

# One client instance shared across all requests.
# The SDK manages its own connection pool internally.
_client = anthropic.Anthropic(api_key=settings.anthropic_api_key)


def _build_system_block(request: CompletionRequest) -> str | list | None:
    """
    Return the system prompt in the right shape for the SDK call.
    If cache_system is True, wrap the text in a cache_control block.
    This tells the Anthropic API to cache the system prompt tokens
    so that subsequent calls with the same system text pay only
    cache_read_input_tokens instead of full input_tokens.
    """
    if not request.system:
        return None
    if not request.cache_system:
        return request.system
    return [
        {
            "type": "text",
            "text": request.system,
            "cache_control": {"type": "ephemeral"},
        }
    ]


def _messages_payload(request: CompletionRequest) -> list[dict]:
    return [{"role": m.role, "content": m.content} for m in request.messages]


async def complete(
    request: CompletionRequest,
    request_id: str,
) -> CompletionResponse:
    """
    Non-streaming completion. Returns a CompletionResponse with full content,
    token usage, and latency.
    """
    model = request.model or settings.default_model
    system = _build_system_block(request)
    messages = _messages_payload(request)

    log.info(
        "anthropic_request",
        request_id=request_id,
        model=model,
        message_count=len(messages),
        cache_system=request.cache_system,
    )

    start = time.perf_counter()
    try:
        kwargs: dict = dict(
            model=model,
            max_tokens=request.max_tokens,
            messages=messages,
        )
        if system is not None:
            kwargs["system"] = system

        msg = _client.messages.create(**kwargs)

    except anthropic.AuthenticationError as exc:
        log.error("anthropic_auth_error", request_id=request_id, detail=str(exc))
        raise  # re-raise; main.py global handler converts this to 500
    except anthropic.RateLimitError as exc:
        log.warning("anthropic_rate_limit", request_id=request_id, detail=str(exc))
        raise
    except anthropic.APIError as exc:
        log.error("anthropic_api_error", request_id=request_id, detail=str(exc))
        raise

    latency_ms = (time.perf_counter() - start) * 1000

    text = msg.content[0].text

    usage = UsageInfo(
        input_tokens=msg.usage.input_tokens,
        output_tokens=msg.usage.output_tokens,
        cache_creation_input_tokens=getattr(
            msg.usage, "cache_creation_input_tokens", 0
        ),
        cache_read_input_tokens=getattr(msg.usage, "cache_read_input_tokens", 0),
    )

    log.info(
        "anthropic_response",
        request_id=request_id,
        model=model,
        input_tokens=usage.input_tokens,
        output_tokens=usage.output_tokens,
        cache_read=usage.cache_read_input_tokens,
        latency_ms=round(latency_ms, 1),
    )

    return CompletionResponse(
        request_id=request_id,
        model=msg.model,
        content=text,
        usage=usage,
        cached=False,
        latency_ms=round(latency_ms, 1),
    )


def stream_complete(
    request: CompletionRequest,
    request_id: str,
) -> AsyncIterator[str]:
    """
    Yields text chunks from the Anthropic streaming API.
    Caller is responsible for wrapping in SSE formatting.
    """
    model = request.model or settings.default_model
    system = _build_system_block(request)
    messages = _messages_payload(request)

    log.info(
        "anthropic_stream_start",
        request_id=request_id,
        model=model,
    )

    async def _generate() -> AsyncIterator[str]:
        kwargs: dict = dict(
            model=model,
            max_tokens=request.max_tokens,
            messages=messages,
        )
        if system is not None:
            kwargs["system"] = system

        try:
            with _client.messages.stream(**kwargs) as stream:
                for text in stream.text_stream:
                    yield text
        except anthropic.APIError as exc:
            log.error("anthropic_stream_error", request_id=request_id, detail=str(exc))
            raise

    return _generate()

Routers: Completions and Streaming

app/routers/completions.py

import uuid

import structlog
from fastapi import APIRouter, Depends, HTTPException, status

import anthropic as anthropic_sdk
from app.auth import require_api_key
from app.cache import cache
from app.claude import complete
from app.logging_config import get_logger
from app.models import CompletionRequest, CompletionResponse

router = APIRouter(prefix="/v1", tags=["completions"])
log = get_logger(__name__)


@router.post(
    "/completions",
    response_model=CompletionResponse,
    summary="Non-streaming completion",
    description="Send a list of messages and receive a single completed response. "
    "Identical requests within the cache TTL window return cached results.",
)
async def create_completion(
    body: CompletionRequest,
    _key: str = Depends(require_api_key),
) -> CompletionResponse:
    request_id = str(uuid.uuid4())

    # Bind request_id into structlog context for this async task
    structlog.contextvars.bind_contextvars(request_id=request_id)

    # Build a cache key from the full request payload
    cache_key = cache.make_key(body.model_dump())
    cached_result = cache.get(cache_key)
    if cached_result is not None:
        log.info("cache_hit", request_id=request_id)
        cached_result.cached = True
        cached_result.request_id = request_id
        return cached_result

    log.info("cache_miss", request_id=request_id)

    try:
        response = await complete(body, request_id)
    except anthropic_sdk.RateLimitError:
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail="Anthropic rate limit reached. Retry after a moment.",
        )
    except anthropic_sdk.AuthenticationError:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Service misconfiguration: invalid Anthropic credentials.",
        )
    except anthropic_sdk.APIError as exc:
        raise HTTPException(
            status_code=status.HTTP_502_BAD_GATEWAY,
            detail=f"Upstream AI error: {exc}",
        )

    cache.set(cache_key, response)
    return response

app/routers/stream.py

import json
import uuid

import structlog
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse

from app.auth import require_api_key
from app.claude import stream_complete
from app.logging_config import get_logger
from app.models import CompletionRequest

router = APIRouter(prefix="/v1", tags=["streaming"])
log = get_logger(__name__)


def _sse_line(data: str) -> str:
    """Format a single chunk as a Server-Sent Events line."""
    return f"data: {json.dumps({'text': data})}\n\n"


@router.post(
    "/completions/stream",
    summary="Streaming completion via SSE",
    description="Streams the model response as Server-Sent Events. "
    "Each event carries a 'text' field with the next token chunk. "
    "The stream ends with 'data: [DONE]'.",
)
async def create_streaming_completion(
    body: CompletionRequest,
    _key: str = Depends(require_api_key),
) -> StreamingResponse:
    request_id = str(uuid.uuid4())
    structlog.contextvars.bind_contextvars(request_id=request_id)
    log.info("stream_request", request_id=request_id)

    generator = stream_complete(body, request_id)

    async def event_stream():
        async for chunk in generator:
            yield _sse_line(chunk)
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
            "X-Request-Id": request_id,
        },
    )

The FastAPI App Factory

app/main.py

import time
import uuid
from contextlib import asynccontextmanager

import structlog
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse

from app.config import settings
from app.logging_config import configure_logging, get_logger
from app.routers import completions, stream

# Configure logging once at startup, before any other imports emit logs
configure_logging(settings.log_level)
log = get_logger(__name__)


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Run once at startup; yield; run once at shutdown."""
    log.info(
        "service_startup",
        environment=settings.environment,
        default_model=settings.default_model,
        cache_ttl=settings.cache_ttl_seconds,
    )
    yield
    log.info("service_shutdown")


app = FastAPI(
    title="Claude AI Microservice",
    version="1.0.0",
    description="Production FastAPI wrapper for the Anthropic Claude API.",
    lifespan=lifespan,
    docs_url="/docs",
    redoc_url="/redoc",
)

# CORS: lock this down to your actual frontend domains in production
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"] if settings.environment == "development" else [],
    allow_methods=["POST", "GET"],
    allow_headers=["X-API-Key", "Content-Type"],
)


@app.middleware("http")
async def request_logging_middleware(request: Request, call_next) -> Response:
    """
    Assign a request ID to every incoming request.
    Log the method, path, and response status + latency on the way out.
    """
    request_id = request.headers.get("X-Request-Id") or str(uuid.uuid4())
    structlog.contextvars.clear_contextvars()
    structlog.contextvars.bind_contextvars(
        request_id=request_id,
        method=request.method,
        path=request.url.path,
    )

    start = time.perf_counter()
    response: Response = await call_next(request)
    latency_ms = round((time.perf_counter() - start) * 1000, 1)

    log.info(
        "http_request",
        status=response.status_code,
        latency_ms=latency_ms,
    )

    response.headers["X-Request-Id"] = request_id
    return response


@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse:
    log.exception("unhandled_exception", exc_info=exc)
    return JSONResponse(
        status_code=500,
        content={"detail": "An internal error occurred. Check logs for request_id."},
    )


# Health check, no auth required
@app.get("/health", tags=["ops"])
async def health() -> dict:
    return {"status": "ok", "service": "claude-microservice", "version": "1.0.0"}


# Mount routers
app.include_router(completions.router)
app.include_router(stream.router)

Dockerfile and Docker Compose

Dockerfile

FROM python:3.12-slim

# Security: run as non-root
RUN useradd --create-home --shell /bin/bash appuser

WORKDIR /app

# Install deps in a separate layer so rebuilds on code changes are fast
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY app/ ./app/

USER appuser

EXPOSE 8000

# --workers 1 for a single-replica pod; scale replicas at the orchestrator level
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]

docker-compose.yml

services:
  api:
    build: .
    ports:
      - "8000:8000"
    env_file:
      - .env
    volumes:
      # Hot-reload in development: mount source over the image copy
      - ./app:/app/app
    command: >
      uvicorn app.main:app
      --host 0.0.0.0
      --port 8000
      --reload
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
      interval: 15s
      timeout: 5s
      retries: 3

Docker image layer cache

python:3.12-slim (base image, rarely rebuilt)

pip install requirements.txt (rebuilt only when requirements change)

COPY app/ (rebuilt on every code change)

CMD uvicorn app.main:app

Cached Cached (most of the time) Rebuilt (fast, ~2s) Config only

ANTHROPIC_API_KEY and SERVICE_API_KEY come from .env or a secrets manager, NOT baked into the image

Figure 2: Docker layer strategy. Separating the pip install layer from the app code layer means routine code changes produce a two-second rebuild, not a full pip install.

Tests

tests/conftest.py

import os

import pytest
from fastapi.testclient import TestClient

# Set env vars before the app is imported so pydantic-settings picks them up
os.environ.setdefault("ANTHROPIC_API_KEY", "test-key-not-real")
os.environ.setdefault("SERVICE_API_KEY", "test-service-key")
os.environ.setdefault("DEFAULT_MODEL", "claude-haiku-4-5")

from app.main import app  # noqa: E402  (must come after env setup)


@pytest.fixture(scope="session")
def client():
    with TestClient(app) as c:
        yield c


@pytest.fixture
def auth_headers():
    return {"X-API-Key": "test-service-key"}

tests/test_completions.py

from unittest.mock import MagicMock, patch

import pytest


def _mock_response(text: str = "Hello from mock Claude"):
    """Build a mock that looks like an anthropic.types.Message."""
    mock = MagicMock()
    mock.model = "claude-haiku-4-5"
    mock.content = [MagicMock(text=text)]
    mock.usage.input_tokens = 20
    mock.usage.output_tokens = 10
    mock.usage.cache_creation_input_tokens = 0
    mock.usage.cache_read_input_tokens = 0
    return mock


class TestHealth:
    def test_health_ok(self, client):
        resp = client.get("/health")
        assert resp.status_code == 200
        assert resp.json()["status"] == "ok"


class TestAuth:
    def test_missing_key_returns_401(self, client):
        resp = client.post("/v1/completions", json={
            "messages": [{"role": "user", "content": "hi"}]
        })
        assert resp.status_code == 401

    def test_wrong_key_returns_403(self, client):
        resp = client.post(
            "/v1/completions",
            json={"messages": [{"role": "user", "content": "hi"}]},
            headers={"X-API-Key": "wrong-key"},
        )
        assert resp.status_code == 403


class TestCompletions:
    @patch("app.claude._client")
    def test_completion_returns_content(self, mock_client, client, auth_headers):
        mock_client.messages.create.return_value = _mock_response("Mocked answer")
        resp = client.post(
            "/v1/completions",
            json={"messages": [{"role": "user", "content": "What is 2+2?"}]},
            headers=auth_headers,
        )
        assert resp.status_code == 200
        data = resp.json()
        assert data["content"] == "Mocked answer"
        assert data["usage"]["input_tokens"] == 20
        assert data["cached"] is False

    @patch("app.claude._client")
    def test_cache_hit_on_second_call(self, mock_client, client, auth_headers):
        mock_client.messages.create.return_value = _mock_response("Cached response")
        payload = {"messages": [{"role": "user", "content": "Unique cache test 12345"}]}
        # First call: miss
        r1 = client.post("/v1/completions", json=payload, headers=auth_headers)
        assert r1.json()["cached"] is False
        # Second call: hit (Anthropic client should NOT be called again)
        r2 = client.post("/v1/completions", json=payload, headers=auth_headers)
        assert r2.json()["cached"] is True
        assert mock_client.messages.create.call_count == 1

    def test_validation_rejects_empty_messages(self, client, auth_headers):
        resp = client.post(
            "/v1/completions",
            json={"messages": []},
            headers=auth_headers,
        )
        assert resp.status_code == 422

    def test_validation_rejects_assistant_last(self, client, auth_headers):
        resp = client.post(
            "/v1/completions",
            json={
                "messages": [
                    {"role": "user", "content": "Hello"},
                    {"role": "assistant", "content": "Hi"},
                ]
            },
            headers=auth_headers,
        )
        assert resp.status_code == 422

Running It End-to-End

Start the service

cp .env.example .env # fill in ANTHROPIC_API_KEY and SERVICE_API_KEY
docker compose up --build

Smoke test with curl

# Non-streaming completion
curl -s -X POST http://localhost:8000/v1/completions \
  -H "Content-Type: application/json" \
  -H "X-API-Key: your-chosen-service-api-key" \
  -d '{
    "messages": [
      {"role": "user", "content": "Explain what a Pydantic validator does in two sentences."}
    ],
    "system": "You are a concise Python educator.",
    "max_tokens": 200,
    "cache_system": true
  }' | python -m json.tool

Sample response:

{
  "request_id": "3f8a2b1c-9d4e-4f7a-a123-0b5c6d7e8f90",
  "model": "claude-sonnet-4-6",
  "content": "A Pydantic validator is a function decorated with @field_validator that runs when a model field is assigned, letting you enforce custom constraints or transform values before they are stored. If the validation fails, Pydantic raises a ValidationError with a structured error message pointing to the exact field.",
  "usage": {
    "input_tokens": 42,
    "output_tokens": 61,
    "cache_creation_input_tokens": 18,
    "cache_read_input_tokens": 0
  },
  "cached": false,
  "latency_ms": 1284.7
}

Streaming with curl

curl -N -s -X POST http://localhost:8000/v1/completions/stream \
  -H "Content-Type: application/json" \
  -H "X-API-Key: your-chosen-service-api-key" \
  -d '{
    "messages": [{"role": "user", "content": "Count slowly from 1 to 5."}],
    "max_tokens": 80
  }'

Sample SSE output (one chunk per line):

data: {"text": "1"}
data: {"text": "..."}
data: {"text": " 2"}
data: {"text": "..."}
data: {"text": " 3"}
data: {"text": " 4"}
data: {"text": " 5"}
data: [DONE]

Run tests

pytest tests/ -v

Common Pitfalls

  • Sharing a synchronous Anthropic client across async workers: The anthropic.Anthropic() client is synchronous. Calling it inside an async FastAPI endpoint blocks the event loop. For high-throughput services, switch to anthropic.AsyncAnthropic() and await client.messages.create(...). The sync client used here is fine for low-to-medium traffic on a single uvicorn worker; it works because FastAPI runs sync code in a thread pool executor automatically only if you declare the route handler non-async. Since this service uses async def routes and calls a sync SDK inside, the correct production fix is to move to AsyncAnthropic. The sync version is kept here to keep the code readable.
  • In-process cache breaks with multiple replicas: The TTLCache in this article is a Python dict. It does not survive process restarts and is not shared across pods. Replace it with Redis (use redis-py async client) before deploying more than one replica.
  • Not setting max_tokens: Claude does not have a sensible default for max_tokens. If you omit it, the SDK raises a validation error. Set a conservative default in config and let callers override it with a reasonable ceiling.
  • Returning raw Anthropic errors to callers: SDK exception messages sometimes include your model name or account context. Always catch at the router level and return a sanitized HTTP error, as shown in completions.py.
  • Forgetting X-Accel-Buffering: no on streaming: If your service sits behind nginx (common on Bluehost, on VPS setups, and in many Kubernetes ingress configs), nginx buffers SSE by default. Without X-Accel-Buffering: no in the streaming response headers, clients see one big flush at the end instead of incremental tokens. The stream.py router sets this header already.
  • Caching streaming responses: The streaming endpoint in this article does not cache. Caching a streaming response would require buffering the whole response first, which defeats the purpose. If you need caching and streaming, cache the full response on first call and return it non-streamed on cache hit, or stream from cache by replaying the buffered text in chunks.
  • CORS wildcard in production: The app factory sets allow_origins=["*"] in development. The production branch returns an empty list, which rejects all cross-origin requests. Fill in your actual frontend domains before deploying.

Cost and Latency Reference

Model Input cost (per 1M tokens) Output cost (per 1M tokens) Typical p50 latency (first token) Best for
claude-haiku-4-5 $0.80 $4.00 ~200ms Classification, routing, high-volume short tasks
claude-sonnet-4-6 $3.00 $15.00 ~400ms Most production tasks: summarization, analysis, code
claude-opus-4-8 $15.00 $75.00 ~700ms Complex reasoning, ambiguous judgment calls

With prompt caching enabled (cache_system: true in the request body), a 10,000-token system prompt that would cost $0.03 per call on Sonnet costs $0.0003 on a cache hit. If your service has a fixed system prompt that does not change between calls, enabling caching is the single highest-impact cost optimization available. See Part 4: Prompt Caching with Claude for the full analysis.

Scenario Recommended model Caching? Expected cost per 1K calls (est.)
Short Q&A, 100-token input claude-haiku-4-5 No ~$0.08
Document analysis, 4K-token system claude-sonnet-4-6 Yes ~$0.60 (vs ~$12 without cache)
Multi-step reasoning, 8K+ context claude-opus-4-8 Yes ~$3.00 (vs ~$75 without cache)
Classification pipeline, 50-token input claude-haiku-4-5 No ~$0.04

For model routing decisions at a finer grain, see Part 27: Cut AI Costs: Model Routing and Batching with Claude. For tracing and monitoring the latency numbers in your own service, see Part 28: Observability for LLM Apps.

What to Add Before Going to Production

The service as written covers the foundations. Here is what a production hardening pass would add, roughly in priority order:

  1. AsyncAnthropic client: Replace anthropic.Anthropic() with anthropic.AsyncAnthropic() and update claude.py to use await. This removes the thread pool overhead for every Anthropic call.
  2. Redis-backed cache: Drop in redis.asyncio and replace the TTLCache class with a Redis get/set wrapper using the same SHA-256 key scheme.
  3. Rate limiting per caller: Use slowapi (a FastAPI-compatible wrapper around limits) to cap requests per API key per minute. Without this, a single misbehaving caller can consume your entire Anthropic quota.
  4. Multiple API keys with a database: Store keys in a Postgres table with a last_used_at column and per-key rate limit config. The require_api_key dependency becomes a database lookup.
  5. OpenTelemetry spans: Add the OTEL Python SDK and emit a span per Anthropic call with token counts and latency as attributes. Ship to Grafana Tempo or Honeycomb. The approach is covered in detail in Part 28.
  6. Prompt injection defense: If the service accepts arbitrary user input that gets concatenated into the prompt, add the input sanitization layer described in Part 25: Guardrails and Prompt Injection Defense.
  7. Structured output endpoint: Add a POST /v1/structured route that accepts a JSON Schema in the request body, builds a tool definition from it, passes tool_choice={"type":"tool","name":"output"} to Claude, and returns the parsed object. See Part 3: Structured Output from Claude for the pattern.

Frequently Asked Questions

Can I deploy this on a shared host like Bluehost?

Not easily. Shared hosts typically do not allow you to run persistent Python processes on port 8000. This service is designed for a VPS, a container platform (Fly.io, Railway, Render, AWS ECS, GCP Cloud Run), or a Kubernetes cluster. Cloud Run is a particularly good fit: it scales to zero when idle, so you pay nothing while the service is not being called, and it handles HTTPS and container orchestration for you.

Why use FastAPI instead of Flask or Django REST Framework?

FastAPI generates an OpenAPI schema from your Pydantic models automatically, which means your callers always have up-to-date docs at /docs. Its dependency injection system (the Depends() mechanism) makes auth, database connections, and shared clients clean to wire up. It is also async-native, which matters here because Anthropic API calls take 1 to 10 seconds and you want to handle concurrent requests without spawning threads. Flask can be made async, but it is not the default. Django REST is heavier than needed for a focused microservice.

How do I handle long-running requests that exceed my reverse proxy’s timeout?

Set your nginx or load balancer’s proxy_read_timeout to at least 120 seconds for the completions endpoint. For requests you expect to take even longer, use the streaming endpoint instead. SSE keeps the connection alive with periodic data events, so the proxy sees activity and does not time out. The X-Accel-Buffering: no header in stream.py is required to prevent nginx from buffering the stream before forwarding it.

Is the in-process cache safe for concurrent requests?

The Python GIL serializes dict access in CPython, so the TTLCache will not corrupt data under concurrent async requests. The race condition to watch for is two concurrent requests with the same cache key both missing the cache, both calling Anthropic, and both writing the result. The second write overwrites the first with an identical value, which is harmless. If you need exactly-once Anthropic calls (for cost accounting), use Redis with a SET NX PX (set if not exists with TTL) pattern to acquire a lock before calling Anthropic.

How do I use tool use or function calling through this service?

Add a tools field to CompletionRequest as a list of tool definitions (following the Anthropic tool schema), pass them through to the _client.messages.create call, and add logic to handle msg.stop_reason == "tool_use". The full pattern is covered in Part 2: Tool Use with Claude. For a microservice, you would typically run the tool execution server-side and continue the conversation loop internally, returning the final text to the caller.

What is the right max_tokens ceiling to enforce in the Pydantic validator?

Claude Sonnet 4.6 and Haiku 4.5 support up to 8096 output tokens per call; Opus 4.8 supports up to 4096 in some configurations. Setting the ceiling at 8096 in the validator (as this article does) is a safe default. If your use case only ever needs short answers, lower the ceiling to 512 or 1024 to prevent runaway generation from a misconfigured caller.

How do I add authentication for multiple tenants with different rate limits?

Create a tenants table in Postgres with columns for api_key_hash (store the SHA-256 of the key, not the key itself), rpm_limit, and monthly_token_budget. Change require_api_key to hash the incoming key and look it up. Inject the tenant object into the route handler via Depends. Track token usage in a separate usage_events table. This is a few hundred lines of additional code on top of what is already here.

View the complete series: AI in Production: 30 Real-World Use Cases with Claude.

External references used in this article:

MUASIF80 Avatar
Previous

Leave a Reply

Your email address will not be published. Required fields are marked *