●MODEL — Claude Opus 4.8 lands, improving coding, agentic, and reasoning over 4.7 at the same price●CODE — Opus 4.8's Fast mode runs at 2.5x speed and is now three times cheaper than earlier models●CODE — Auto-mode command classification expands, with denial tracking and live bash path autocomplete●ENTERPRISE — Connector permissions in custom roles let admins control which tools each role can use●TEAM — Tag Claude directly in Slack and hand off tasks while you focus elsewhere●MCP — MCP servers now show startup auth notices, making connection status easier to track●MODEL — Claude Opus 4.8 lands, improving coding, agentic, and reasoning over 4.7 at the same price●CODE — Opus 4.8's Fast mode runs at 2.5x speed and is now three times cheaper than earlier models●CODE — Auto-mode command classification expands, with denial tracking and live bash path autocomplete●ENTERPRISE — Connector permissions in custom roles let admins control which tools each role can use●TEAM — Tag Claude directly in Slack and hand off tasks while you focus elsewhere●MCP — MCP servers now show startup auth notices, making connection status easier to track
Building a Real-Time AI Processing Pipeline with Claude API and Apache Kafka
Learn how to integrate Claude API into Apache Kafka event streams with production-grade patterns. Implement smart buffering, model routing, and Dead Letter Queues to run large-scale real-time AI analysis at low cost.
Imagine your e-commerce platform receives hundreds of user reviews every minute. You want to detect spam, abuse, and fake reviews in real time — not in the next batch run, but within seconds of submission. But calling the Claude API for each individual event would cost tens of thousands of dollars per month and offer no latency guarantees when traffic spikes.
This is the tension that a Claude API × Apache Kafka pipeline is designed to resolve. By combining Kafka's event buffering capabilities with Claude's deep language understanding, you can achieve high accuracy, low latency, and low cost simultaneously — goals that initially seem contradictory.
In this guide, we'll use an e-commerce review moderation system as our running example and walk through production-ready design patterns with complete, working Python code. By the end, you'll have a blueprint you can adapt to any high-volume AI processing workload — user behavior analysis, content classification, fraud detection, or real-time document processing.
Why Not Batch Jobs or Polling?
Most teams default to one of two approaches: scheduled batch processing or periodic polling. Both hit fundamental ceilings in production.
The batch processing problem: By definition, batch results are stale. If a fake review campaign hits your platform at 2 PM and your next batch runs at 3 PM, that's 60 minutes of unmoderated content. More critically, batch jobs create bursty, unpredictable loads on both your infrastructure and the Claude API. A batch that normally takes 5 minutes could take 30 if it overlaps with a traffic spike.
The polling problem: Polling a database or API endpoint at fixed intervals is wasteful in both directions. Poll too frequently to achieve low latency, and you're calling the API unnecessarily when no new data exists. Poll infrequently to reduce wasted calls, and latency suffers. There's no sweet spot when event volume is variable — which it always is in production.
Where Kafka changes the equation: Events are published to a Kafka topic the moment they occur. Consumers receive them immediately, with no polling overhead. Kafka's Consumer Group abstraction provides built-in horizontal scaling — spin up more Consumer instances and the topic's partitions are automatically redistributed among them. And offset management means that even if a Consumer crashes mid-batch, no event is lost: the uncommitted offset causes those events to be redelivered when the Consumer restarts.
The critical architectural insight for Claude API integration is that Kafka acts as a buffering and flow-control layer. When a traffic spike occurs, Kafka absorbs the inbound events. The Consumer processes them at a rate consistent with Claude API's rate limits — no need for complex backpressure logic. The events will all get processed; they just queue up briefly during spikes.
Overall Architecture: Three Layers
Before diving into code, it helps to understand the system at a high level.
[Event Sources] [Kafka Layer] [AI Processing Layer] [Output Layer]
E-commerce site → review-events → Consumer Group → Database
Mobile apps (Kafka Topic) (Claude API) Slack alerts
Admin systems + Smart Buffering Webhooks
review-dlq ← Failed events Admin UI
(Dead Letter Q)
review-ai-results ← Analysis results
Topic design matters more than it seems. We use three topics:
review-events: Raw incoming events (unparsed, unprocessed)
review-ai-results: Enriched events after Claude API analysis
review-dlq: Events that failed processing after max retries
Separating raw events from results means downstream consumers (the database writer, the notification service) can work from a clean, enriched stream without knowing anything about the AI processing internals.
Consumer Group design enables true parallelism. Because different Consumer Groups maintain independent offsets, the same events can flow through completely separate processing pipelines simultaneously. We implement review-moderator (spam and abuse detection) and review-analyzer (sentiment analysis and summarization) as separate groups. Each processes every review independently, at its own pace, without blocking the other.
Partition count determines your maximum parallelism ceiling. A topic with 6 partitions can have at most 6 Consumer instances in a group doing useful work. For a production deployment, start with 12 partitions to give yourself room to scale.
✦
Thank you for reading this far.
Continue Reading
What follows includes implementation code, benchmarks, and practical content we hope you'll find useful. This site runs without ads — server and development costs are supported entirely by members like you. If it's been helpful, we'd be truly grateful for your support.
WHAT YOU'LL LEARN
✦Developers stuck on connecting Kafka Consumer Groups to Claude API will gain smart buffering patterns that cut costs by up to 80% while maintaining real-time performance
✦Get complete working Python code for model routing (Opus/Sonnet/Haiku auto-selection), Dead Letter Queue handling, and production monitoring
✦Learn the architectural design to process thousands of events per second with Claude API without blowing your budget or missing SLA targets
Secure payment via Stripe · Cancel anytime
✦
Unlock This Article
Get full access to the rest of this article. Buy once, read anytime. This site is ad-free — your support goes directly toward keeping it running.
The base Consumer handles the mechanics of Kafka interaction, deserializing messages and managing offsets. Note the enable_auto_commit=False — this is the most important setting in the entire configuration.
# consumer_base.pyimport asyncioimport jsonimport loggingfrom dataclasses import dataclassfrom typing import AsyncGeneratorfrom kafka import KafkaConsumerfrom kafka.errors import KafkaErrorlogger = logging.getLogger(__name__)@dataclassclass ReviewEvent: event_id: str product_id: str user_id: str review_text: str rating: int timestamp: str source: str # "web" | "mobile" | "api"class ReviewEventConsumer: def __init__( self, bootstrap_servers: list[str], topic: str, group_id: str, max_poll_records: int = 50, ): self.consumer = KafkaConsumer( topic, bootstrap_servers=bootstrap_servers, group_id=group_id, auto_offset_reset='latest', enable_auto_commit=False, # ← Critical: manual commit only value_deserializer=lambda m: json.loads(m.decode('utf-8')), max_poll_records=max_poll_records, # Generous timeout: Claude API calls can take several seconds max_poll_interval_ms=300000, ) self.topic = topic self.running = False async def consume(self) -> AsyncGenerator[list[ReviewEvent], None]: """Yield event batches as an async generator""" self.running = True try: while self.running: loop = asyncio.get_event_loop() # poll() is blocking — run in executor to keep event loop free raw_messages = await loop.run_in_executor( None, lambda: self.consumer.poll(timeout_ms=1000, max_records=50) ) if not raw_messages: await asyncio.sleep(0.1) continue events = [] for tp, messages in raw_messages.items(): for msg in messages: try: event = ReviewEvent(**msg.value) events.append(event) except (KeyError, TypeError) as e: logger.error(f"Malformed event at offset {msg.offset}: {e}") # Malformed events go to DLQ, not left in Kafka if events: yield events # Commit AFTER the caller has finished processing await loop.run_in_executor(None, self.consumer.commit) except KafkaError as e: logger.error(f"Kafka consumer error: {e}") raise finally: self.consumer.close() def stop(self): self.running = False
Why does enable_auto_commit=False matter so much? With auto-commit enabled, Kafka records the offset as consumed the moment the Consumer retrieves a message — before any processing happens. If the Claude API call that follows times out or throws an exception, the offset is already committed, and the event is gone forever. Manual commit moves the "success" signal to after successful processing, giving you at-least-once delivery guarantees.
Step 2: Smart Buffering to Optimize API Costs
The single biggest cost lever in this architecture is batching. Calling Claude API for 20 reviews in a single request is roughly 15–18× cheaper than 20 individual calls. The reason: the system prompt, which might be 300 tokens, is counted only once per API call regardless of batch size.
The challenge is deciding when to flush the buffer. Flush too eagerly and you lose the batching benefit. Hold too long and latency suffers. The solution is a three-trigger buffer.
# smart_buffer.pyimport asyncioimport timefrom dataclasses import dataclassfrom typing import Callable@dataclassclass BufferConfig: max_size: int = 20 # Flush when N events accumulate max_wait_sec: float = 3.0 # Flush after N seconds regardless of count max_tokens: int = 15000 # Flush when estimated token count reaches thisclass SmartBuffer: """ Three-trigger buffer: flush on count, time, or token limit. Each trigger serves a different traffic condition. """ def __init__(self, config: BufferConfig, flush_callback: Callable): self.config = config self.flush_callback = flush_callback self._buffer: list = [] self._estimated_tokens: int = 0 self._last_flush: float = time.time() self._lock = asyncio.Lock() def _estimate_tokens(self, text: str) -> int: # ~1.3 tokens/char for English text, plus per-item overhead return int(len(text) * 1.3) + 50 async def add(self, event) -> None: async with self._lock: token_estimate = self._estimate_tokens( getattr(event, 'review_text', str(event)) ) self._buffer.append(event) self._estimated_tokens += token_estimate should_flush = ( len(self._buffer) >= self.config.max_size or self._estimated_tokens >= self.config.max_tokens or (time.time() - self._last_flush) >= self.config.max_wait_sec ) if should_flush: await self._flush() async def _flush(self) -> None: if not self._buffer: return batch = self._buffer.copy() self._buffer.clear() self._estimated_tokens = 0 self._last_flush = time.time() # Background task: don't block the buffer while processing asyncio.create_task(self.flush_callback(batch)) async def force_flush(self) -> None: """Called during graceful shutdown — process whatever remains""" async with self._lock: await self._flush()
The three triggers serve distinct traffic scenarios:
Count trigger fires constantly during peak hours. The buffer fills in under a second when traffic is heavy.
Time trigger fires during off-peak hours. Even if only 2 events arrive in 3 seconds, they get processed rather than sitting in the buffer indefinitely.
Token trigger prevents sending an unexpectedly large payload to the API — a safety valve for unusually verbose review batches.
Running this buffer in production, you'll typically see the count trigger dominate during business hours and the time trigger take over overnight. Either way, the maximum latency from event ingestion to Claude API analysis is bounded at max_wait_sec.
Step 3: Model Routing for Precision at Scale
Not every review needs Claude Opus. A five-star review that says "Amazing product!" requires essentially zero reasoning to classify as positive and non-abusive. Routing it through Opus wastes budget that would be better spent on genuinely difficult cases.
The model routing logic should reflect the actual difficulty distribution of your workload:
# model_router.pyfrom enum import Enumimport anthropicimport jsonimport asyncioimport logginglogger = logging.getLogger(__name__)class ModelTier(Enum): FAST = "claude-haiku-4-5-20251001" # $0.08 input / $0.40 output per MTok BALANCED = "claude-sonnet-4-6" # $3.00 input / $15.00 output per MTok DEEP = "claude-opus-4-6" # $15.00 input / $75.00 output per MTokdef select_model(events: list) -> ModelTier: """ Route to the cheapest model that can handle the batch reliably. The routing logic should be calibrated to your specific data. Run a sample of your events through all three models, compare accuracy, and adjust the thresholds until the cost/accuracy tradeoff meets your SLA. """ avg_length = sum(len(e.review_text) for e in events) / len(events) avg_rating = sum(e.rating for e in events) / len(events) # Short, unambiguous signal → Haiku handles confidently if avg_length < 50 and (avg_rating <= 1.5 or avg_rating >= 4.5): return ModelTier.FAST # Long text or ambiguous rating requires careful reasoning if avg_length > 200 or (1.8 <= avg_rating <= 3.2): return ModelTier.DEEP return ModelTier.BALANCEDclass ReviewModerator: def __init__(self): self.client = anthropic.Anthropic() async def analyze_batch(self, events: list) -> list[dict]: """ Analyze a batch of reviews in a single Claude API call. Returns a list of analysis results, one per input event. """ model_tier = select_model(events) reviews_input = [ {"id": e.event_id, "text": e.review_text, "rating": e.rating} for e in events ] system_prompt = """You are an e-commerce content moderation AI.For each review in the array, determine:- is_spam: Unsolicited promotion, meaningless filler, or bot-generated content (true/false)- is_abusive: Personal attacks, hate speech, harassment, or threats (true/false)- is_fake: Unnatural rating patterns or suspiciously template-like language suggesting manipulation (true/false)- sentiment: Overall emotional polarity ("positive" | "neutral" | "negative")- confidence: Your confidence in the judgment, from 0.0 to 1.0- reason: If any flag is true, a brief explanation under 60 characters. Null if no flags.Return ONLY a JSON object in this exact format, no other text:{"results": [{"id": "...", "is_spam": false, "is_abusive": false, "is_fake": false, "sentiment": "positive", "confidence": 0.95, "reason": null}]}""" try: response = self.client.messages.create( model=model_tier.value, max_tokens=2048, system=system_prompt, messages=[{ "role": "user", "content": f"Moderate these reviews:\n\n{json.dumps(reviews_input, indent=2)}" }] ) result_text = response.content[0].text.strip() result = json.loads(result_text) # Log cost for monitoring dashboard cost = self._calculate_cost( model_tier, response.usage.input_tokens, response.usage.output_tokens ) logger.info( f"Batch completed: n={len(events)}, model={model_tier.name}, " f"in={response.usage.input_tokens}, out={response.usage.output_tokens}, " f"cost=${cost:.5f}" ) return result.get("results", []) except json.JSONDecodeError as e: logger.error(f"Failed to parse Claude response: {e\!r}") logger.debug(f"Raw response: {response.content[0].text[:500]}") # Caller will route to DLQ raise except anthropic.RateLimitError: logger.warning("Rate limited — backing off 60s and retrying") await asyncio.sleep(60) return await self.analyze_batch(events) except anthropic.APITimeoutError: logger.error("API timeout on batch") raise def _calculate_cost(self, tier: ModelTier, input_t: int, output_t: int) -> float: costs = { ModelTier.FAST: (0.00008, 0.0004), ModelTier.BALANCED: (0.003, 0.015), ModelTier.DEEP: (0.015, 0.075), } in_rate, out_rate = costs[tier] return (input_t * in_rate + output_t * out_rate) / 1_000_000
After running this routing in a real workload for a week and comparing results against manual spot-checks, you'll likely find that Haiku handles 55–65% of events with accuracy equivalent to Sonnet. The remaining 35–45% split between Sonnet and Opus. The exact distribution depends heavily on your content — an English-language review platform will route more to Haiku than a platform with complex, multilingual long-form reviews.
Step 4: Dead Letter Queue for Resilient Error Handling
In any production system, failures are a certainty. Claude API timeouts, malformed model responses, transient network errors — these will all happen. The question is whether they cause data loss or just temporary delays.
A Dead Letter Queue (DLQ) is the standard answer: failed events are preserved in a separate topic for later retry rather than silently dropped.
# dlq_handler.pyfrom kafka import KafkaProducerimport jsonimport timeimport asyncioimport logginglogger = logging.getLogger(__name__)class DLQHandler: def __init__(self, bootstrap_servers: list[str]): self.producer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8'), acks='all', # Wait for all in-sync replicas — no data loss on broker failure retries=3, ) async def send( self, events: list, error_type: str, error_message: str, retry_count: int = 0 ) -> None: """Route failed events to the DLQ with retry metadata.""" for event in events: dlq_payload = { "original_event": { "event_id": event.event_id, "product_id": event.product_id, "review_text": event.review_text, "rating": event.rating, "source": event.source, }, "failure_info": { "error_type": error_type, "error_message": error_message[:500], # Truncate long stack traces "retry_count": retry_count, "failed_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), }, "retry_policy": { "max_retries": 3, "exhausted": retry_count >= 3, "next_retry_at": self._next_retry_time(retry_count), } } loop = asyncio.get_event_loop() await loop.run_in_executor( None, lambda p=dlq_payload: self.producer.send("review-dlq", value=p) ) logger.warning( f"Sent {len(events)} events to DLQ " f"(type={error_type}, retry={retry_count})" ) def _next_retry_time(self, retry_count: int) -> str: """Exponential backoff: 1 min → 5 min → 30 min → give up""" if retry_count >= 3: return "exhausted" backoffs = [60, 300, 1800] delay = backoffs[retry_count] return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() + delay))
A separate DLQ retry Consumer polls the review-dlq topic on a schedule, filters for events where next_retry_at has passed, resubmits them to review-events, and increments retry_count. Events with exhausted: true get written to a manual review queue in the database for human inspection.
The exponential backoff (1 minute, 5 minutes, 30 minutes) is important. When the Claude API is experiencing temporary degradation, hammering it with immediate retries makes the situation worse. Backing off gives the service time to recover.
Step 5: The Complete Pipeline
# pipeline.pyimport asyncioimport loggingimport signalimport osfrom consumer_base import ReviewEventConsumerfrom smart_buffer import SmartBuffer, BufferConfigfrom model_router import ReviewModeratorfrom dlq_handler import DLQHandlerlogging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s: %(message)s')logger = logging.getLogger(__name__)KAFKA_SERVERS = os.environ.get( "KAFKA_BOOTSTRAP_SERVERS", "localhost:9092").split(",")class ReviewModerationPipeline: def __init__(self): self.consumer = ReviewEventConsumer( bootstrap_servers=KAFKA_SERVERS, topic="review-events", group_id="review-moderator", ) self.moderator = ReviewModerator() self.dlq = DLQHandler(bootstrap_servers=KAFKA_SERVERS) self.buffer = SmartBuffer( config=BufferConfig(max_size=20, max_wait_sec=3.0, max_tokens=15000), flush_callback=self._process_batch, ) self._stats = {"processed": 0, "errors": 0, "dlq_sent": 0} async def _process_batch(self, events: list) -> None: """Called by SmartBuffer when a batch is ready.""" try: results = await self.moderator.analyze_batch(events) flagged = [ r for r in results if r.get("is_spam") or r.get("is_abusive") or r.get("is_fake") ] if flagged: logger.warning(f"Flagged {len(flagged)}/{len(events)} in batch") # → Write to moderation queue, send alert webhook # (implementation depends on your stack) self._stats["processed"] += len(events) except json.JSONDecodeError as e: logger.error(f"Model response parse error: {e}") self._stats["errors"] += len(events) await self.dlq.send(events, "parse_error", str(e)) self._stats["dlq_sent"] += len(events) except Exception as e: logger.error(f"Unexpected error in batch: {e}") self._stats["errors"] += len(events) await self.dlq.send(events, "processing_error", str(e)) self._stats["dlq_sent"] += len(events) async def run(self) -> None: logger.info("🚀 Review Moderation Pipeline starting") logger.info(f"Kafka: {KAFKA_SERVERS}, Buffer: {self.buffer.config}") loop = asyncio.get_event_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler( sig, lambda: asyncio.create_task(self.shutdown()) ) try: async for events in self.consumer.consume(): for event in events: await self.buffer.add(event) except asyncio.CancelledError: logger.info("Pipeline run cancelled") async def shutdown(self) -> None: """Graceful shutdown: drain the buffer before exiting.""" logger.info("Shutdown signal received — draining buffer...") self.consumer.stop() await self.buffer.force_flush() # Give background tasks a moment to complete await asyncio.sleep(2.0) logger.info(f"Shutdown complete. Final stats: {self._stats}")if __name__ == "__main__": asyncio.run(ReviewModerationPipeline().run())
Production Monitoring: Consumer Lag and Cost Tracking
Two metrics should be on your monitoring dashboard from day one.
Consumer Lag is the number of unprocessed messages sitting in Kafka. It's the clearest signal of whether your processing capacity matches your event inflow. A lag of 0 is ideal. A gradually growing lag means you need to scale out — add more Consumer instances. A lag spike followed by recovery is normal (traffic burst handled gracefully).
Recommended alert thresholds:
Lag > 500 for 5 minutes: warning notification
Lag > 5,000 for 2 minutes: page on-call
API Cost per 1,000 Events tracks whether model routing is working as intended. A healthy breakdown looks like this in your hourly summary:
If Opus climbs above 10% of your traffic, your routing thresholds may need recalibration — or your event data has shifted toward more ambiguous content. Both are worth investigating.
Scaling Strategies
The architecture supports horizontal scaling without code changes, thanks to Kafka's partition-based design.
Scaling Consumers: Start with 1 Consumer instance. When lag consistently grows beyond your acceptable threshold, add a second. With 6 partitions, you can run up to 6 Consumer instances in parallel before hitting diminishing returns. Each instance processes a subset of partitions independently.
Scaling Claude API throughput: Claude API rate limits are per API key by default. If you need more throughput than a single key provides, you can implement a simple round-robin across multiple keys, with each Consumer instance using a dedicated key. Just ensure your key management is secure — don't hard-code keys; use environment variables or a secrets manager.
Handling traffic spikes: The Kafka buffer absorbs spikes automatically. A 10× traffic spike just means the Consumer lag temporarily grows; events don't get dropped. The Consumer processes them at its normal rate over the following minutes. For most workloads, this is the right tradeoff: a brief delay in analysis rather than the complexity of auto-scaling infrastructure.
Common Pitfalls
Pitfall 1: Leaving auto-commit enabled. This is the most dangerous default. With enable_auto_commit=True, Kafka commits the offset when the message is retrieved — before any processing. A Claude API timeout after retrieval means the event is permanently lost. Always use manual commit with enable_auto_commit=False.
Pitfall 2: Oversizing batches. More events per batch sounds efficient, but batches over 30 events increase timeout risk and can cause mid-response truncation with some models. The 15–25 event range offers the best balance of efficiency and reliability based on real workloads.
Pitfall 3: No JSON error handling on responses. Claude API almost always returns valid JSON when instructed to, but edge cases exist — especially if review text contains characters resembling JSON syntax. Always catch json.JSONDecodeError and route failures to the DLQ rather than letting exceptions propagate.
Pitfall 4: No graceful shutdown. In Kubernetes rolling deployments, pods receive SIGTERM before termination. Without a shutdown handler that calls force_flush(), every deployment drops the events currently in the buffer. Implement the signal handler and sleep briefly to let background tasks complete.
Pitfall 5: Not monitoring the DLQ. DLQ events are only useful if you know they exist. Set up a simple alert: if review-dlq has more than 50 unprocessed messages, send a Slack notification. Most DLQ growth indicates a systematic problem with the Claude API integration — catching it early prevents small issues from cascading.
Pitfall 6: Ignoring the max_poll_interval_ms setting. If your Claude API call plus processing takes longer than this timeout, Kafka will assume the Consumer has died and rebalance the partition to another instance. The default is 5 minutes (300000ms), which is generous, but if you're using Opus with large batches, verify your P99 processing time stays well under this threshold.
Next Steps
The pipeline described here can process roughly 100,000 reviews per day at a Claude API cost of $25–50 (assuming the routing distribution in the monitoring example above). Adjusting the model routing thresholds based on your specific data can drive that further down.
Then run the pipeline with a small buffer (max_size=5, max_wait_sec=1.0) to see batching in action quickly. Once the basic flow works, tune the buffer settings to match your actual traffic volume and cost targets.
If Kafka feels like overkill for your current scale, Redis Streams is a lighter-weight alternative that supports the same design patterns with a smaller operational footprint. The core insight — buffer events, batch them intelligently, route to the right model, preserve failures in a DLQ — applies regardless of the message queue technology underneath.
Share
Thank You for Reading
Claude Lab is ad-free, supported entirely by members like you. We publish practical guides daily with implementation code, benchmarks, and production-ready patterns. If you've found it useful, we'd love to have you on board.