kombu
Messaging library for Python — provides a unified high-level interface over AMQP and other message transports (Redis, SQS, MongoDB, SQLite, in-memory). kombu features: Connection for transport-agnostic broker connections, Producer/Consumer for publish-subscribe, Exchange (direct/fanout/topic/headers), Queue with routing keys, SimpleQueue for Redis/memory queues, kombu.serialization for JSON/pickle/msgpack, retry policies, drain_events() loop, acks/nacks for reliability, dead letter queues, task routing for Celery, connection pooling, heartbeats, and failover URLs. Celery's transport layer — usable standalone without Celery.
Score Breakdown
⚙ Agent Friendliness
🔒 Security
Use amqps:// and rediss:// for TLS-encrypted connections. Never use pickle serialization (accept=['pickle']) for messages from untrusted sources — arbitrary code execution risk. Use JSON serialization only. Store broker credentials in environment variables not in code. AMQP vhosts provide namespace isolation.
⚡ Reliability
Best When
Distributed task messaging with broker flexibility — kombu is ideal when you need Celery's transport layer standalone, or when you need to switch between RabbitMQ, Redis, and SQS without code changes.
Avoid When
In-process queuing (use queue.Queue), HTTP webhooks, real-time event streaming (use Kafka), or when Celery itself handles all messaging needs.
Use Cases
- • Agent task queue — from kombu import Connection, Queue, Producer; conn = Connection('redis://localhost:6379//'); queue = Queue('tasks', routing_key='tasks'); with conn.channel() as channel: producer = Producer(channel); producer.publish({'task': 'process', 'data': payload}, routing_key='tasks', declare=[queue]) — publish task; agent distributes work to queue; consumers process tasks independently
- • Agent consumer loop — from kombu import Connection, Queue; def process_message(body, message): handle(body); message.ack(); queue = Queue('tasks'); with Connection('redis://') as conn: with conn.Consumer(queue, callbacks=[process_message], accept=['json']): while True: conn.drain_events(timeout=1) — consume messages; agent worker processes queue messages; message.ack() confirms processing; message.reject() for failures
- • Agent pub/sub with exchange — from kombu import Exchange, Queue; exchange = Exchange('events', type='fanout'); queues = [Queue('monitor', exchange), Queue('logger', exchange)]; producer.publish(event, exchange=exchange) — fanout pattern; agent broadcasts events to multiple subscribers; fanout delivers to all bound queues simultaneously
- • Agent SQS messaging — conn = Connection('sqs://key:secret@'); queue = Queue('my-queue'); producer.publish(message) — AWS SQS transport; agent uses same kombu API with SQS backend; swap connection URL to switch transports; no code changes needed for RabbitMQ vs SQS vs Redis
- • Agent reliable messaging — conn = Connection('amqp://guest:guest@localhost//'); conn.ensure_connection(max_retries=3); producer.publish(msg, retry=True, retry_policy={'max_retries': 3, 'interval_start': 0, 'interval_step': 1}) — retry policy; agent messaging handles transient broker failures; retry_policy configures backoff; ensures_connection handles initial connection failures
Not For
- • Simple in-process queues — kombu adds broker dependency; for in-process queuing use Python's queue.Queue
- • HTTP-based webhooks — kombu uses AMQP/Redis protocols not HTTP; for webhook delivery use requests with retry logic
- • Real-time streaming — kombu is task/message queue not event stream; for streaming use Kafka (confluent-kafka) or Redis Streams
Interface
Authentication
Auth via connection URL: amqp://user:password@host/vhost or redis://:password@host. For SQS: AWS credentials via URL or environment. TLS: amqps:// for AMQP over TLS.
Pricing
kombu is BSD licensed. Free for all use. Broker costs depend on chosen backend (RabbitMQ self-hosted or AWS SQS pay-per-message).
Agent Metadata
Known Gotchas
- ⚠ message.ack() required or message requeued — consumed messages must be explicitly acknowledged; without ack(), message returns to queue when consumer disconnects; agent consumer callbacks must always call message.ack() on success or message.reject(requeue=True) on failure; forgetting ack causes infinite redelivery loop
- ⚠ accept parameter required for JSON deserialization — consumer = conn.Consumer(queue, accept=['json']); without accept list, kombu raises kombu.exceptions.ContentDisallowed for safety; agent consumer code must explicitly list accepted serializations; accept=['json'] for safe deserialization; accept=['pickle'] requires trust in message source
- ⚠ drain_events blocks indefinitely without timeout — conn.drain_events() without timeout parameter blocks forever; agent consumer loops should use timeout: conn.drain_events(timeout=1.0); catches socket.timeout which is normal; pattern: while True: try: conn.drain_events(timeout=1) except socket.timeout: pass — allows clean shutdown checks
- ⚠ Connection pooling required for high-throughput producers — creating new Connection() per message is slow (TCP + AMQP handshake ~10ms each); agent code publishing frequently must use: from kombu import pools; with pools.producers[conn].acquire(block=True) as producer: producer.publish(msg) — reuses connections from pool
- ⚠ Exchange and Queue must be declared before use — producer.publish() to undeclared exchange raises ChannelError; agent code must declare infrastructure: with conn.channel() as channel: exchange.declare(); queue.declare(); bind queue to exchange; or use declare=[queue] in publish() call — kombu declares on first use if passed
- ⚠ SimpleQueue API differs from AMQP Queue — kombu.simple.SimpleQueue is a higher-level wrapper using one connection for both get/put; queue.get(block=True, timeout=1) returns message; queue.put(payload) publishes; message.ack() still required; SimpleQueue designed for Redis/memory transports; not all AMQP features available through SimpleQueue
Full Evaluation Report
Detailed scoring breakdown, competitive positioning, security analysis, and improvement recommendations for kombu.
Scores are editorial opinions as of 2026-03-06.