Summarize Alerts and On-Call Noise with Claude: AI Alert Summarization for On-Call Teams

Series
AI in Production: 30 Real-World Use Cases with Claude

Part 16 of 30 · View the full series

TL;DR

  • AI alert summarization groups and deduplicates raw monitoring noise into a concise on-call digest, reducing mean time to acknowledge (MTTA) by cutting the number of actionable items an engineer must read.
  • The POC in this article takes a raw JSON alert stream, groups it by service, collapses near-duplicate alerts into single entries, and asks Claude to write a plain-English summary of what changed, with severity triage.
  • Prompt caching is used to hold the system instructions constant across repeated digest runs, cutting costs significantly when the digest runs every 1-5 minutes during an incident.
  • Claude Haiku 4.5 is the right model tier for this task: high volume, low latency, low cost, and the summaries are short enough that the cheaper model produces output indistinguishable from Sonnet at a fraction of the price.
  • The structured output pattern (forcing a JSON schema via tool use) makes the digest machine-readable, so it can feed a Slack bot, a PagerDuty webhook, or a status page without extra parsing.
  • Common traps: flushing the alert buffer too infrequently (you miss the storm), sending raw alert JSON that is 40k tokens (you burn money on noise), and not deduplicating before the API call (Claude has to do it the hard way).

Why On-Call Teams Drown in Alert Noise

A mid-size production system at a growing startup fires, on a bad day, anywhere from 50 to several hundred alerts per hour. These alerts come from Prometheus, Datadog, Cloudwatch, Grafana, or a mix of all four. They repeat. They overlap. Three different monitors fire for the same root cause. CPU alert fires, then a latency alert fires two minutes later because CPU was the cause, then a dependent service alert fires because the latency degraded it. The on-call engineer gets paged for all three.

AI alert summarization addresses this directly: instead of reading 47 individual alert lines, the engineer reads one paragraph per affected service that says what changed, when it started, and what the current status is. That shift from raw data to a human-readable summary is exactly what Claude is good at, and it is where the business case is clearest.

What This Costs Without Automation

Pagerduty’s State of Digital Operations report found that high-volume alert noise is the top reason on-call engineers experience burnout. More concretely: if an engineer spends 3 minutes triaging each alert and receives 60 alerts during a 2-hour incident window, that is 3 hours of reading. If Claude can collapse those 60 alerts into 6 service-grouped summaries, that same triage takes 10 minutes. The cost of the API calls is trivially small compared to even one missed escalation.

Related work in this series

Part 15, Incident Response Copilot: Speed Up On-Call with Claude, covers using Claude to draft runbook steps and coordinate response communication during an active incident. This article is the upstream counterpart: it handles the raw alert stream before a human even decides whether to declare an incident. Part 7, Log Triage with AI: Cluster and Explain Errors Using Claude, covers a similar problem for log lines rather than alert events.

Architecture of the AI Alert Summarization Pipeline

The design has three stages: ingest, preprocess, and summarize. The preprocess stage does the mechanical work that you should never ask an LLM to do: deduplication by alert fingerprint, grouping by service, and trimming the payload to only the fields that matter. The summarize stage sends the cleaned, grouped data to Claude and gets back a structured digest.

Alert Stream Prometheus Datadog CloudWatch Grafana (raw, noisy)

Preprocess 1. Deduplicate 2. Group by service 3. Trim payload 4. Sort by severity (Python, no API cost)

Claude API Haiku 4.5 Structured output via tool use Prompt caching (system block cached)

Digest JSON output Slack bot PagerDuty Status page

AI Alert Summarization Pipeline

Figure 1. The three-stage pipeline: raw alert ingest, deterministic preprocessing in Python, then Claude for natural-language summarization with structured output.

Why preprocess before calling the API

Sending 200 raw alert objects to Claude is wasteful. Each Prometheus alert JSON typically includes labels, annotations, a generator URL, and timestamps. Most of that is noise for summarization. Preprocessing strips each alert to its essential four fields: service name, alert name, severity, and message. That can take a 200-alert payload from 40k tokens down to 3k tokens, a 13x cost reduction before Claude sees a single byte.

Deduplication is also mechanical: if the same alert fired 12 times in 5 minutes, it is one event with a count. Claude does not need to infer that; a Python dict keyed on (service, alert_name, severity) handles it in one pass.

Setting Up the Project

Install and environment

pip install anthropic python-dotenv

Create a requirements.txt:

anthropic>=0.28.0
python-dotenv>=1.0.0

Create a .env file (never commit this):

ANTHROPIC_API_KEY=sk-ant-...

The Complete POC: AI Alert Summarization with Claude

The code below is a single self-contained file. It defines a realistic mock alert stream, preprocesses it, sends it to Claude Haiku with structured output via tool use, and prints the digest. Prompt caching is enabled so that when you run the digest every 1-5 minutes during an incident, the system block does not incur repeated input token costs.

"""
alert_summarizer.py

AI alert summarization pipeline using Claude.
Reads a noisy alert stream, deduplicates and groups by service,
then calls Claude Haiku to produce a plain-English on-call digest
with structured JSON output via tool use.

Usage:
    python alert_summarizer.py

Requirements:
    pip install anthropic python-dotenv
    ANTHROPIC_API_KEY set in .env or environment.
"""

import os
import json
import time
from collections import defaultdict
from datetime import datetime, timezone
from dotenv import load_dotenv
import anthropic

load_dotenv()

# ---------------------------------------------------------------------------
# 1. Mock alert stream
#    In production, replace this with your real alert source:
#    Prometheus Alertmanager webhook, Datadog Events API, CloudWatch SNS, etc.
# ---------------------------------------------------------------------------

RAW_ALERTS = [
    # payment-service: CPU spike fires multiple times (duplicates)
    {"service": "payment-service", "alert_name": "HighCPU", "severity": "warning",
     "message": "CPU usage at 87% for 5 minutes", "fired_at": "2026-06-04T09:00:01Z"},
    {"service": "payment-service", "alert_name": "HighCPU", "severity": "warning",
     "message": "CPU usage at 89% for 6 minutes", "fired_at": "2026-06-04T09:01:01Z"},
    {"service": "payment-service", "alert_name": "HighCPU", "severity": "warning",
     "message": "CPU usage at 91% for 7 minutes", "fired_at": "2026-06-04T09:02:01Z"},
    {"service": "payment-service", "alert_name": "HighLatencyP99", "severity": "critical",
     "message": "p99 latency 4200ms, SLA is 500ms", "fired_at": "2026-06-04T09:02:30Z"},
    {"service": "payment-service", "alert_name": "ErrorRateSpike", "severity": "critical",
     "message": "Error rate 12% over last 5 minutes (baseline 0.2%)", "fired_at": "2026-06-04T09:03:00Z"},

    # auth-service: one real alert, one informational
    {"service": "auth-service", "alert_name": "TokenValidationErrors", "severity": "critical",
     "message": "JWT validation failures: 340 errors/min", "fired_at": "2026-06-04T09:03:10Z"},
    {"service": "auth-service", "alert_name": "TokenValidationErrors", "severity": "critical",
     "message": "JWT validation failures: 355 errors/min", "fired_at": "2026-06-04T09:04:10Z"},
    {"service": "auth-service", "alert_name": "PodRestarted", "severity": "info",
     "message": "auth-service pod restarted 1 time", "fired_at": "2026-06-04T09:04:15Z"},

    # database: slow queries + connection pool
    {"service": "postgres-primary", "alert_name": "SlowQueryCount", "severity": "warning",
     "message": "Queries > 2s: 45 in last minute (baseline 2)", "fired_at": "2026-06-04T09:02:45Z"},
    {"service": "postgres-primary", "alert_name": "ConnectionPoolNearLimit", "severity": "warning",
     "message": "Connection pool 92% full (184/200)", "fired_at": "2026-06-04T09:03:15Z"},
    {"service": "postgres-primary", "alert_name": "ReplicationLag", "severity": "critical",
     "message": "Replica lag 45 seconds, alert threshold 30s", "fired_at": "2026-06-04T09:04:00Z"},

    # cdn: unrelated issue
    {"service": "cdn-edge", "alert_name": "OriginErrorRate", "severity": "warning",
     "message": "Origin 5xx rate 3.2% (baseline 0.1%)", "fired_at": "2026-06-04T09:01:50Z"},
    {"service": "cdn-edge", "alert_name": "OriginErrorRate", "severity": "warning",
     "message": "Origin 5xx rate 3.8%", "fired_at": "2026-06-04T09:03:50Z"},

    # worker: queue backup
    {"service": "job-worker", "alert_name": "QueueDepthHigh", "severity": "warning",
     "message": "Queue depth 8400 jobs, up from 200 at 08:50", "fired_at": "2026-06-04T09:03:30Z"},
    {"service": "job-worker", "alert_name": "QueueDepthHigh", "severity": "warning",
     "message": "Queue depth 11200 jobs", "fired_at": "2026-06-04T09:04:30Z"},
    {"service": "job-worker", "alert_name": "WorkerProcessingRate", "severity": "warning",
     "message": "Processing rate dropped from 400/min to 80/min", "fired_at": "2026-06-04T09:04:00Z"},
]


# ---------------------------------------------------------------------------
# 2. Preprocessing: deduplicate + group by service
# ---------------------------------------------------------------------------

def preprocess_alerts(raw_alerts: list[dict]) -> dict[str, list[dict]]:
    """
    Deduplicate alerts by (service, alert_name, severity) and group by service.
    Returns a dict keyed by service name; each value is a list of unique alert
    dicts with an added 'count' field showing how many times it fired.
    """
    # Step 1: bucket by fingerprint to find duplicates
    buckets: dict[tuple, list[dict]] = defaultdict(list)
    for alert in raw_alerts:
        fp = (alert["service"], alert["alert_name"], alert["severity"])
        buckets[fp].append(alert)

    # Step 2: collapse each bucket to one representative entry
    deduplicated: list[dict] = []
    for (service, alert_name, severity), instances in buckets.items():
        # Pick the latest instance as the representative
        latest = max(instances, key=lambda a: a["fired_at"])
        deduplicated.append({
            "service": service,
            "alert_name": alert_name,
            "severity": severity,
            "message": latest["message"],
            "first_fired": min(instances, key=lambda a: a["fired_at"])["fired_at"],
            "last_fired": latest["fired_at"],
            "count": len(instances),
        })

    # Step 3: group by service, sort by severity (critical first)
    severity_order = {"critical": 0, "warning": 1, "info": 2}
    by_service: dict[str, list[dict]] = defaultdict(list)
    for alert in deduplicated:
        by_service[alert["service"]].append(alert)

    for service in by_service:
        by_service[service].sort(key=lambda a: severity_order.get(a["severity"], 9))

    return dict(by_service)


# ---------------------------------------------------------------------------
# 3. Build the user message content
# ---------------------------------------------------------------------------

def build_user_message(grouped_alerts: dict[str, list[dict]]) -> str:
    """
    Serialise the cleaned alert groups into a compact JSON string for Claude.
    """
    digest_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
    payload = {
        "digest_generated_at": digest_time,
        "services": grouped_alerts,
    }
    return json.dumps(payload, indent=2)


# ---------------------------------------------------------------------------
# 4. Structured output schema (tool use forces JSON structure)
# ---------------------------------------------------------------------------

DIGEST_TOOL = {
    "name": "emit_on_call_digest",
    "description": (
        "Emit a structured on-call digest from alert data. "
        "Call this tool with the full digest; do not write prose outside the tool call."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "overall_status": {
                "type": "string",
                "enum": ["critical", "degraded", "warning", "ok"],
                "description": "Highest severity across all services"
            },
            "summary": {
                "type": "string",
                "description": "One to two sentence plain-English executive summary of the current state"
            },
            "services": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "service": {"type": "string"},
                        "status": {
                            "type": "string",
                            "enum": ["critical", "degraded", "warning", "ok"]
                        },
                        "what_changed": {
                            "type": "string",
                            "description": "Plain-English description of what is different from normal"
                        },
                        "top_alert": {
                            "type": "string",
                            "description": "The single most important alert for this service"
                        },
                        "alert_count": {"type": "integer"},
                        "first_alert_at": {"type": "string"}
                    },
                    "required": ["service", "status", "what_changed", "top_alert", "alert_count", "first_alert_at"]
                }
            },
            "recommended_action": {
                "type": "string",
                "description": "One sentence on where the on-call engineer should look first"
            }
        },
        "required": ["overall_status", "summary", "services", "recommended_action"]
    }
}


# ---------------------------------------------------------------------------
# 5. System prompt (cached)
# ---------------------------------------------------------------------------

SYSTEM_PROMPT = (
    "You are an on-call assistant for a software platform. "
    "You receive pre-grouped, deduplicated monitoring alerts for multiple services. "
    "Your job is to summarise the current state for an on-call engineer who is waking up "
    "to a page. Be factual and specific. Do not speculate about root causes unless the "
    "evidence is clear from the alerts themselves. Use the count field to note if an alert "
    "is firing repeatedly. Always call the emit_on_call_digest tool with your response; "
    "never write prose outside the tool call."
)


# ---------------------------------------------------------------------------
# 6. Call Claude with prompt caching + structured output
# ---------------------------------------------------------------------------

def call_claude(user_message: str) -> dict:
    """
    Send the grouped alert payload to Claude Haiku.
    Uses prompt caching on the system block (cost saving for repeated runs)
    and forces structured output via tool_choice.
    Returns the digest as a Python dict.
    """
    client = anthropic.Anthropic()

    try:
        response = client.messages.create(
            model="claude-haiku-4-5",
            max_tokens=1024,
            system=[
                {
                    "type": "text",
                    "text": SYSTEM_PROMPT,
                    "cache_control": {"type": "ephemeral"},
                }
            ],
            tools=[DIGEST_TOOL],
            tool_choice={"type": "tool", "name": "emit_on_call_digest"},
            messages=[
                {
                    "role": "user",
                    "content": user_message,
                }
            ],
        )
    except anthropic.APIError as exc:
        print(f"[ERROR] Claude API call failed: {exc}")
        raise

    # Extract the tool result
    for block in response.content:
        if block.type == "tool_use" and block.name == "emit_on_call_digest":
            digest = block.input
            # Attach token usage for cost visibility
            digest["_usage"] = {
                "input_tokens": response.usage.input_tokens,
                "output_tokens": response.usage.output_tokens,
                "cache_creation_input_tokens": getattr(response.usage, "cache_creation_input_tokens", 0),
                "cache_read_input_tokens": getattr(response.usage, "cache_read_input_tokens", 0),
            }
            return digest

    raise ValueError("Claude did not call the emit_on_call_digest tool as expected")


# ---------------------------------------------------------------------------
# 7. Format and print the digest
# ---------------------------------------------------------------------------

STATUS_ICONS = {
    "critical": "[CRIT]",
    "degraded": "[DEGR]",
    "warning": "[WARN]",
    "ok":       "[ OK ]",
}


def print_digest(digest: dict) -> None:
    status_icon = STATUS_ICONS.get(digest.get("overall_status", "ok"), "[????]")
    print()
    print("=" * 70)
    print(f"  ON-CALL DIGEST  {status_icon} OVERALL: {digest.get('overall_status', '').upper()}")
    print("=" * 70)
    print()
    print(f"  {digest.get('summary', '')}")
    print()
    print(f"  ACTION: {digest.get('recommended_action', '')}")
    print()
    print("-" * 70)

    for svc in digest.get("services", []):
        icon = STATUS_ICONS.get(svc.get("status", "ok"), "[????]")
        print(f"  {icon}  {svc['service']}")
        print(f"         What changed : {svc['what_changed']}")
        print(f"         Top alert    : {svc['top_alert']}")
        print(f"         Alert count  : {svc['alert_count']}  (first: {svc['first_alert_at']})")
        print()

    usage = digest.get("_usage", {})
    if usage:
        print("-" * 70)
        print(f"  Token usage: {usage['input_tokens']} in / {usage['output_tokens']} out | "
              f"cache_created={usage['cache_creation_input_tokens']} "
              f"cache_read={usage['cache_read_input_tokens']}")
    print("=" * 70)
    print()


# ---------------------------------------------------------------------------
# 8. Main
# ---------------------------------------------------------------------------

def main():
    print(f"Processing {len(RAW_ALERTS)} raw alerts...")

    grouped = preprocess_alerts(RAW_ALERTS)
    unique_count = sum(len(v) for v in grouped.values())
    print(f"After deduplication: {unique_count} unique alerts across {len(grouped)} services")

    user_msg = build_user_message(grouped)
    print(f"Sending {len(user_msg)} characters to Claude Haiku...\n")

    start = time.monotonic()
    digest = call_claude(user_msg)
    elapsed = time.monotonic() - start
    print(f"Claude responded in {elapsed:.2f}s")

    print_digest(digest)

    # Also write the raw JSON digest to disk for downstream consumers (Slack bot, etc.)
    with open("digest_latest.json", "w") as fh:
        json.dump(digest, fh, indent=2)
    print("Full JSON digest written to digest_latest.json")


if __name__ == "__main__":
    main()

Sample run output

With a real API key and the mock alert stream above, running python alert_summarizer.py produces output similar to this:

Processing 16 raw alerts...
After deduplication: 11 unique alerts across 5 services
Sending 1847 characters to Claude Haiku...

Claude responded in 1.34s

======================================================================
  ON-CALL DIGEST  [CRIT] OVERALL: CRITICAL
======================================================================

  The platform is experiencing a cascading failure affecting the payment
  service (CPU, latency, and error rate all in breach), auth service
  (JWT validation failures at 10x normal), and the postgres primary
  (replication lag critical). Job processing has fallen to 20% of normal
  throughput, likely a downstream effect of the database issues.

  ACTION: Start with postgres-primary replication lag, as it is likely
  the root cause driving payment-service latency, auth failures, and
  the job worker slowdown.

----------------------------------------------------------------------
  [CRIT]  payment-service
         What changed : CPU spiked to 91% and has not recovered; p99
                        latency jumped from SLA 500ms to 4200ms; error
                        rate hit 12% vs 0.2% baseline.
         Top alert    : ErrorRateSpike - 12% error rate, 60x baseline
         Alert count  : 5  (first: 2026-06-04T09:00:01Z)

  [CRIT]  auth-service
         What changed : JWT validation failures at 355/min, up from
                        near zero; one pod restart observed.
         Top alert    : TokenValidationErrors - 355 JWT failures/min
         Alert count  : 3  (first: 2026-06-04T09:03:10Z)

  [CRIT]  postgres-primary
         What changed : Slow query count 22x normal, connection pool
                        at 92%, and replica lag exceeded 30s threshold.
         Top alert    : ReplicationLag - 45s lag, threshold 30s
         Alert count  : 3  (first: 2026-06-04T09:02:45Z)

  [WARN]  cdn-edge
         What changed : Origin 5xx rate climbed from 0.1% to 3.8%,
                        consistent with upstream service degradation.
         Top alert    : OriginErrorRate - 3.8% origin errors
         Alert count  : 2  (first: 2026-06-04T09:01:50Z)

  [WARN]  job-worker
         What changed : Queue depth grew from 200 to 11200 jobs while
                        processing rate dropped 80%.
         Top alert    : QueueDepthHigh - 11200 queued jobs
         Alert count  : 3  (first: 2026-06-04T09:03:30Z)

----------------------------------------------------------------------
  Token usage: 987 in / 312 out | cache_created=248 cache_read=0
======================================================================

Full JSON digest written to digest_latest.json
Key idea: The “what changed” note is what makes this output genuinely useful. It is not just a list of alert names; it is a concise description of the delta from normal. That is the difference between an engineer who reads the page cold and immediately has context versus one who has to open three dashboards to understand the situation.

Model Choice and Cost for AI Alert Summarization

Alert summarization is a volume task. You might run it every minute during an active incident. The model tier decision matters economically, but it also affects latency: a 3-second API call is acceptable for a batch report, but not for a live on-call page where the engineer is waiting.

Model Typical latency (this task) Input cost per 1M tokens Output cost per 1M tokens Verdict for alert summarization
claude-haiku-4-5 1.0 to 1.8s $0.25 $1.25 Best choice: fast, cheap, output quality is sufficient
claude-sonnet-4-6 2.5 to 4.0s $3.00 $15.00 Use if you also want runbook steps in the same call
claude-opus-4-8 5.0 to 9.0s $15.00 $75.00 Overkill for summarization; use for deep incident analysis

At 1000 tokens per call (typical for this payload size) and a 1-minute cadence, Haiku costs roughly $0.0015 per digest run. Over a 2-hour incident that is about $0.18. Compare that to the cost of a 30-minute war room call for context-setting and the economics are obvious.

Prompt caching payoff

The system prompt in the POC is 248 tokens. Caching it means the first call pays the full creation cost, but every subsequent call within the 5-minute cache window reads those tokens for free. During a sustained incident with digest runs every minute, you recover the caching overhead within 2 calls. See Part 4: Prompt Caching with Claude: Cut Token Costs by 90 Percent for a deeper treatment of this pattern.

Structured Output: Why Tool Use Instead of Asking for JSON

You could ask Claude to “respond in JSON” in the system prompt. That works most of the time. But “most of the time” is not acceptable when this output is going to feed a Slack bot or a webhook. Structured output via tool use provides a guarantee: if Claude does not call the tool with the correct schema, the API call returns a stop_reason that is not "tool_use", and your code raises an exception you can catch and retry cleanly. There is no parsing ambiguity.

For a full walkthrough of this pattern, see Part 3: Structured Output from Claude: Reliable JSON for Real Apps.

Extending the output schema

The schema in the POC is minimal. In production you might add:

  • runbook_url: a string field Claude fills with the matching runbook link if you include a runbook index in the system prompt
  • probable_root_cause: a nullable string for cases where the evidence clearly points to one service
  • resolved_services: a list of services where all alerts have resolved since the last digest
  • escalation_required: a boolean that fires when overall_status == "critical" and the incident has lasted more than N minutes

Integrating the Digest with Slack and PagerDuty

The digest_latest.json file the script writes is the integration seam. A small wrapper around the main function can push it to Slack:

import requests, json, os

def post_to_slack(digest: dict, webhook_url: str) -> None:
    status = digest.get("overall_status", "unknown").upper()
    summary = digest.get("summary", "")
    action = digest.get("recommended_action", "")

    blocks = [
        {"type": "header", "text": {"type": "plain_text",
            "text": f"On-Call Digest: {status}"}},
        {"type": "section", "text": {"type": "mrkdwn", "text": summary}},
        {"type": "section", "text": {"type": "mrkdwn",
            "text": f"*Action:* {action}"}},
        {"type": "divider"},
    ]

    for svc in digest.get("services", []):
        blocks.append({
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": (
                    f"*{svc['service']}* ({svc['status'].upper()})\n"
                    f"{svc['what_changed']}\n"
                    f"_Top alert: {svc['top_alert']}_"
                ),
            }
        })

    requests.post(webhook_url, json={"blocks": blocks}, timeout=5)


# Usage (add to main() after print_digest):
# webhook = os.environ["SLACK_WEBHOOK_URL"]
# post_to_slack(digest, webhook)

For PagerDuty, you would post to the Events API v2, using the overall_status field to set the severity, and including the summary in the event body. The recommended_action maps well to the PagerDuty custom_details field.

Prometheus

Datadog

CloudWatch

Grafana

Alert Buffer (1-5 min)

Claude Haiku 4.5 Structured digest

Slack #oncall

PagerDuty

Status page

digest_latest.json

Diagram: Buffered digest loop, fanning out to multiple consumers

Figure 2. The buffered digest loop. Alerts accumulate in a buffer for 1-5 minutes, then one Claude call produces a digest that fans out to multiple consumers.

Running the Digest in a Loop

In production, you want the digest to run on a schedule, not once. The simplest approach is a cron job or a systemd timer. The digest script already writes digest_latest.json, so a second process can poll that file and post updates to Slack only when the overall status changes. Here is the loop wrapper:

"""
alert_digest_loop.py

Runs the alert summarizer every N seconds and posts to Slack
only when the overall status changes from the last run.
Replace fetch_live_alerts() with your real alert source.
"""

import time
import json
from alert_summarizer import preprocess_alerts, build_user_message, call_claude, RAW_ALERTS

def fetch_live_alerts() -> list[dict]:
    """
    Replace this with a real call to Prometheus Alertmanager API,
    Datadog Events API, or your alerting platform's webhook receiver.
    """
    return RAW_ALERTS  # placeholder


def run_loop(interval_seconds: int = 60) -> None:
    last_status = None

    while True:
        try:
            raw = fetch_live_alerts()
            grouped = preprocess_alerts(raw)
            msg = build_user_message(grouped)
            digest = call_claude(msg)

            current_status = digest.get("overall_status")
            with open("digest_latest.json", "w") as fh:
                json.dump(digest, fh, indent=2)

            if current_status != last_status:
                print(f"[STATUS CHANGE] {last_status} -> {current_status}")
                # post_to_slack(digest, os.environ["SLACK_WEBHOOK_URL"])
                last_status = current_status
            else:
                print(f"[{time.strftime('%H:%M:%S')}] Status unchanged: {current_status}")

        except Exception as exc:
            print(f"[ERROR] Digest run failed: {exc}")

        time.sleep(interval_seconds)


if __name__ == "__main__":
    run_loop(interval_seconds=60)

Common Pitfalls in AI Alert Summarization

Pitfall Symptom Fix
Sending raw alert JSON without trimming Input tokens spike to 30k+, cost is 10-20x expected Strip to service, alert_name, severity, message before the API call
No deduplication before the API call Claude re-summarizes 12 identical HighCPU alerts as 12 events Bucket by (service, alert_name, severity) in Python, send count instead
Buffer window too long (10+ minutes) Digest arrives stale; incident already escalated before engineer sees it Use 1-2 minute windows during active incidents, 5 minutes during quiet hours
Asking for JSON in prose, not via tool use Occasional markdown fences or extra prose breaks downstream parsing Use tool_choice to force the emit_on_call_digest tool
Retrying on every API error without backoff Rate limit errors turn into a thundering herd of retries Wrap in try/except anthropic.APIError with exponential backoff
Not logging token usage No visibility into costs; surprise billing at end of month Log msg.usage.input_tokens and output_tokens on every call

Handling alert floods

During a major outage you might receive 500 or more alerts in a 5-minute window. Even after deduplication, the payload can be large. Add a hard cap in the preprocessing step: if any service has more than 10 unique alert types, keep the top 10 by severity and add a note that N additional alert types were suppressed. This prevents token blowout while preserving the most actionable information.

MAX_ALERTS_PER_SERVICE = 10

def cap_per_service(grouped: dict[str, list[dict]], cap: int = MAX_ALERTS_PER_SERVICE) -> dict:
    result = {}
    for service, alerts in grouped.items():
        if len(alerts) > cap:
            result[service] = alerts[:cap]
            # Add a synthetic alert indicating suppression
            result[service].append({
                "service": service,
                "alert_name": "DigestSuppressed",
                "severity": "info",
                "message": f"{len(alerts) - cap} additional alert types suppressed",
                "count": 1,
            })
        else:
            result[service] = alerts
    return result

Connecting this to the broader incident workflow

The alert digest is the front door of an incident workflow. Once the engineer acknowledges the page and sees the digest, they might want runbook steps (see Part 15: Incident Response Copilot) or want to query their logs for corroborating evidence (see Part 7: Log Triage with AI). These three tools (alert summarization, incident copilot, log triage) form a natural stack: summarize first, then respond, then dig into logs.

Cost and Latency Summary

To put concrete numbers on the economics: a typical digest call in this POC uses about 1000 input tokens and 300 output tokens with Claude Haiku. At current pricing, that is $0.00025 + $0.000375 = approximately $0.0006 per call. Running every minute for a 4-hour incident window is 240 calls at $0.14 total. The prompt cache hits after the second call, so calls 3 through 240 pay zero for the 248-token system prompt.

For teams that want even tighter costs, see Part 27: Cut AI Costs: Model Routing and Batching with Claude. The model routing pattern there can dynamically switch from Haiku to Sonnet when the alert picture is complex enough to warrant deeper analysis, and back to Haiku for routine digests.

Frequently Asked Questions

Can I use this with Prometheus Alertmanager directly?

Yes. Alertmanager supports webhook receivers. You point a webhook at an endpoint that runs the preprocessing and Claude call, then posts the digest to Slack. Replace the RAW_ALERTS list in the POC with the JSON body Alertmanager sends to your webhook. The Alertmanager payload uses labels.alertname, labels.severity, and annotations.summary; map these to the four fields the preprocessor expects.

How do I handle alert resolution (alerts that fire then clear)?

Alertmanager sends a status: resolved field when an alert clears. Add a status field to your alert objects in the preprocessor, and add a resolved section to the output schema. In the Claude prompt, instruct it to call out which services recovered since the last digest. This gives the on-call engineer a clear signal that a problem is trending toward resolution without them having to check dashboards.

My team uses OpsGenie, not PagerDuty. Does this work?

The digest output is plain JSON, so it works with any alerting platform that has an API. OpsGenie has an Alerts API that accepts POST requests; map overall_status to OpsGenie priority (critical maps to P1, warning to P3) and summary to the alert message. The Slack integration from the POC also works alongside OpsGenie without any changes.

How do I prevent Claude from hallucinating root causes?

The system prompt in the POC explicitly says: “Do not speculate about root causes unless the evidence is clear from the alerts themselves.” That constraint, combined with the structured output schema (which has no free-form root cause field unless you add one), keeps the output factual. If you do add a probable_root_cause field, make it nullable and instruct Claude to set it to null if the evidence is ambiguous. In practice, Haiku is conservative enough that hallucinated root causes are rare when the prompt is explicit.

What if the same service has both critical and warning alerts?

The preprocessor sorts alerts within each service bucket by severity (critical first). The schema’s status field for the service should reflect the highest severity present. In the Claude prompt you can add the instruction: “Set the service status to the highest severity alert within that service.” This ensures consistent severity escalation without ambiguity.

Can I add historical context to the digest (e.g., was this service noisy last week)?

Yes, and this is a natural extension. Store a rolling 7-day count of alert firings per (service, alert_name) in a lightweight SQLite or Redis store. Include a baseline_weekly_count field in each alert object before you send it to Claude. The model can then say “this alert has fired 3 times this week but 45 times in the last hour, indicating an acute spike.” This is the same pattern covered in Part 10: RAG with Claude and pgvector, applied to time-series alert metadata instead of documents.

Does this scale to a microservices architecture with 50 services?

Yes, with one adjustment: group services into domains (data plane, control plane, edge, background jobs) and either run one Claude call per domain in parallel or add a domain-level hierarchy to the output schema. Sending 50 service summaries in one call is technically feasible (it fits comfortably in Haiku’s context window), but the output becomes harder to read. A two-level digest (domain overview + per-service details on demand) matches how on-call engineers actually triage large systems.

Back to the full AI in Production series index.

External references:

MUASIF80 Avatar
Previous

Leave a Reply

Your email address will not be published. Required fields are marked *