kombu

Messaging library for Python — provides a unified high-level interface over AMQP and other message transports (Redis, SQS, MongoDB, SQLite, in-memory). kombu features: Connection for transport-agnostic broker connections, Producer/Consumer for publish-subscribe, Exchange (direct/fanout/topic/headers), Queue with routing keys, SimpleQueue for Redis/memory queues, kombu.serialization for JSON/pickle/msgpack, retry policies, drain_events() loop, acks/nacks for reliability, dead letter queues, task routing for Celery, connection pooling, heartbeats, and failover URLs. Celery's transport layer — usable standalone without Celery.

Evaluated Mar 06, 2026 (0d ago) v5.x
Homepage ↗ Repo ↗ Developer Tools python kombu amqp rabbitmq redis messaging queue celery
⚙ Agent Friendliness
61
/ 100
Can an agent use this?
🔒 Security
81
/ 100
Is it safe for agents?
⚡ Reliability
80
/ 100
Does it work consistently?

Score Breakdown

⚙ Agent Friendliness

MCP Quality
--
Documentation
78
Error Messages
75
Auth Simplicity
85
Rate Limits
90

🔒 Security

TLS Enforcement
85
Auth Strength
82
Scope Granularity
80
Dep. Hygiene
82
Secret Handling
78

Use amqps:// and rediss:// for TLS-encrypted connections. Never use pickle serialization (accept=['pickle']) for messages from untrusted sources — arbitrary code execution risk. Use JSON serialization only. Store broker credentials in environment variables not in code. AMQP vhosts provide namespace isolation.

⚡ Reliability

Uptime/SLA
80
Version Stability
82
Breaking Changes
78
Error Recovery
80
AF Security Reliability

Best When

Distributed task messaging with broker flexibility — kombu is ideal when you need Celery's transport layer standalone, or when you need to switch between RabbitMQ, Redis, and SQS without code changes.

Avoid When

In-process queuing (use queue.Queue), HTTP webhooks, real-time event streaming (use Kafka), or when Celery itself handles all messaging needs.

Use Cases

  • Agent task queue — from kombu import Connection, Queue, Producer; conn = Connection('redis://localhost:6379//'); queue = Queue('tasks', routing_key='tasks'); with conn.channel() as channel: producer = Producer(channel); producer.publish({'task': 'process', 'data': payload}, routing_key='tasks', declare=[queue]) — publish task; agent distributes work to queue; consumers process tasks independently
  • Agent consumer loop — from kombu import Connection, Queue; def process_message(body, message): handle(body); message.ack(); queue = Queue('tasks'); with Connection('redis://') as conn: with conn.Consumer(queue, callbacks=[process_message], accept=['json']): while True: conn.drain_events(timeout=1) — consume messages; agent worker processes queue messages; message.ack() confirms processing; message.reject() for failures
  • Agent pub/sub with exchange — from kombu import Exchange, Queue; exchange = Exchange('events', type='fanout'); queues = [Queue('monitor', exchange), Queue('logger', exchange)]; producer.publish(event, exchange=exchange) — fanout pattern; agent broadcasts events to multiple subscribers; fanout delivers to all bound queues simultaneously
  • Agent SQS messaging — conn = Connection('sqs://key:secret@'); queue = Queue('my-queue'); producer.publish(message) — AWS SQS transport; agent uses same kombu API with SQS backend; swap connection URL to switch transports; no code changes needed for RabbitMQ vs SQS vs Redis
  • Agent reliable messaging — conn = Connection('amqp://guest:guest@localhost//'); conn.ensure_connection(max_retries=3); producer.publish(msg, retry=True, retry_policy={'max_retries': 3, 'interval_start': 0, 'interval_step': 1}) — retry policy; agent messaging handles transient broker failures; retry_policy configures backoff; ensures_connection handles initial connection failures

Not For

  • Simple in-process queues — kombu adds broker dependency; for in-process queuing use Python's queue.Queue
  • HTTP-based webhooks — kombu uses AMQP/Redis protocols not HTTP; for webhook delivery use requests with retry logic
  • Real-time streaming — kombu is task/message queue not event stream; for streaming use Kafka (confluent-kafka) or Redis Streams

Interface

REST API
No
GraphQL
No
gRPC
No
MCP Server
No
SDK
Yes
Webhooks
No

Authentication

Methods: basic url_credentials
OAuth: No Scopes: No

Auth via connection URL: amqp://user:password@host/vhost or redis://:password@host. For SQS: AWS credentials via URL or environment. TLS: amqps:// for AMQP over TLS.

Pricing

Model: open_source
Free tier: Yes
Requires CC: No

kombu is BSD licensed. Free for all use. Broker costs depend on chosen backend (RabbitMQ self-hosted or AWS SQS pay-per-message).

Agent Metadata

Pagination
none
Idempotent
Partial
Retry Guidance
Documented

Known Gotchas

  • message.ack() required or message requeued — consumed messages must be explicitly acknowledged; without ack(), message returns to queue when consumer disconnects; agent consumer callbacks must always call message.ack() on success or message.reject(requeue=True) on failure; forgetting ack causes infinite redelivery loop
  • accept parameter required for JSON deserialization — consumer = conn.Consumer(queue, accept=['json']); without accept list, kombu raises kombu.exceptions.ContentDisallowed for safety; agent consumer code must explicitly list accepted serializations; accept=['json'] for safe deserialization; accept=['pickle'] requires trust in message source
  • drain_events blocks indefinitely without timeout — conn.drain_events() without timeout parameter blocks forever; agent consumer loops should use timeout: conn.drain_events(timeout=1.0); catches socket.timeout which is normal; pattern: while True: try: conn.drain_events(timeout=1) except socket.timeout: pass — allows clean shutdown checks
  • Connection pooling required for high-throughput producers — creating new Connection() per message is slow (TCP + AMQP handshake ~10ms each); agent code publishing frequently must use: from kombu import pools; with pools.producers[conn].acquire(block=True) as producer: producer.publish(msg) — reuses connections from pool
  • Exchange and Queue must be declared before use — producer.publish() to undeclared exchange raises ChannelError; agent code must declare infrastructure: with conn.channel() as channel: exchange.declare(); queue.declare(); bind queue to exchange; or use declare=[queue] in publish() call — kombu declares on first use if passed
  • SimpleQueue API differs from AMQP Queue — kombu.simple.SimpleQueue is a higher-level wrapper using one connection for both get/put; queue.get(block=True, timeout=1) returns message; queue.put(payload) publishes; message.ack() still required; SimpleQueue designed for Redis/memory transports; not all AMQP features available through SimpleQueue

Full Evaluation Report

Detailed scoring breakdown, competitive positioning, security analysis, and improvement recommendations for kombu.

$99

Scores are editorial opinions as of 2026-03-06.

5208
Packages Evaluated
26151
Need Evaluation
173
Need Re-evaluation
Community Powered