asyncpg
High-performance async PostgreSQL client for Python — uses PostgreSQL binary protocol directly for maximum throughput. asyncpg features: asyncpg.connect() and asyncpg.create_pool() for connection management, await conn.fetch(sql)/fetchrow()/fetchval()/execute() for queries, parameterized queries with $1 $2 placeholders, transaction support via conn.transaction(), COPY FROM/TO for bulk data, prepared statements, custom type codecs, listen/notify pub/sub, and connection pool with min_size/max_size. Significantly faster than psycopg2 for async PostgreSQL.
Score Breakdown
⚙ Agent Friendliness
🔒 Security
PostgreSQL driver. Always use parameterized queries ($1, $2) — never format SQL strings. SSL in production: await asyncpg.connect(dsn, ssl='require'). Connection URL contains credentials — use environment variables. COPY is powerful but validate source data.
⚡ Reliability
Best When
High-performance async PostgreSQL access in FastAPI or asyncio applications — asyncpg is 3-5x faster than psycopg2 for async workloads.
Avoid When
ORM needed (use SQLAlchemy+asyncpg), non-PostgreSQL databases, synchronous code, or when psycopg3's sync+async flexibility is preferred.
Use Cases
- • Agent async PostgreSQL — import asyncpg; async with asyncpg.connect('postgresql://user:pass@host/db') as conn: rows = await conn.fetch('SELECT * FROM users WHERE status = $1', 'active'); for row in rows: process(dict(row)) — query; agent queries PostgreSQL with parameterized queries; row is Record with dict-like access
- • Agent connection pool — pool = await asyncpg.create_pool('postgresql://...', min_size=5, max_size=20); async with pool.acquire() as conn: result = await conn.fetchrow('SELECT * FROM items WHERE id = $1', item_id) — pooled; agent uses connection pool for concurrent FastAPI/asyncio application; acquire() checks out connection
- • Agent transaction — async with conn.transaction(): await conn.execute('INSERT INTO orders ...'); await conn.execute('UPDATE inventory ...') — atomic; agent wraps multiple operations in transaction; automatic rollback on exception; nested transactions create savepoints
- • Agent bulk COPY — import io; data = io.StringIO('1,Alice\n2,Bob\n'); await conn.copy_to_table('users', source=data, format='csv', columns=['id', 'name']) — bulk insert; agent loads large datasets into PostgreSQL; COPY is much faster than individual INSERTs; also copy_from_table for bulk export
- • Agent listen/notify — await conn.add_listener('channel', lambda conn, pid, ch, payload: handle(payload)); await conn.execute('LISTEN channel'); while True: await asyncio.sleep(1) — pub/sub; agent subscribes to PostgreSQL NOTIFY for real-time event streaming between processes
Not For
- • ORM usage — asyncpg is a low-level driver; for ORM use SQLAlchemy async with asyncpg as backend
- • MySQL/SQLite — asyncpg is PostgreSQL-specific; for MySQL use aiomysql; for SQLite use aiosqlite
- • Synchronous code — asyncpg is async-only; for sync PostgreSQL use psycopg2 or psycopg3
Interface
Authentication
PostgreSQL authentication via connection URL. SSL/TLS via ssl= parameter. SCRAM-SHA-256 supported.
Pricing
asyncpg is Apache 2.0 licensed. Free for all use.
Agent Metadata
Known Gotchas
- ⚠ Placeholders use $1 $2 not %s — asyncpg uses positional $1, $2, $3 placeholders not psycopg2-style %s or %(name)s; conn.fetch('SELECT * FROM t WHERE id = $1 AND status = $2', id_val, 'active') — positional args; agent code migrating from psycopg2: replace all %s with $1, $2, etc.; named params not supported
- ⚠ fetch() returns list of Records not dicts — rows = await conn.fetch(sql) returns list of asyncpg.Record; Record is dict-like but not dict; dict(row) converts; row['column'] for access; row[0] for index access; agent code: use dict(row) to convert for JSON serialization; [dict(r) for r in rows] for list
- ⚠ Pool must be created at startup, not per request — await asyncpg.create_pool() is expensive; create once at application startup; agent FastAPI: create pool in lifespan context manager; store on app.state.pool; acquire per request with pool.acquire(); close pool at shutdown: await pool.close()
- ⚠ Transaction context manager auto-rollbacks — async with conn.transaction(): if exception raised inside, transaction auto-rolls back; no explicit rollback needed; nested: async with conn.transaction() creates savepoint; agent code: wrap database writes in transaction(); handle exceptions outside transaction block for partial success
- ⚠ Connection from pool must be used as context manager — async with pool.acquire() as conn: — context manager returns connection on exit; NOT async with pool.acquire() as conn: without release; or: conn = await pool.acquire(); try: ... finally: await pool.release(conn) — explicit release pattern; agent code: prefer async with acquire()
- ⚠ listen/notify requires dedicated connection — listeners block connection; agent code: use separate connection (not from pool) for LISTEN: conn = await asyncpg.connect(...); await conn.add_listener('channel', callback); keep connection alive; pool connections are not suitable for persistent listeners
Alternatives
Full Evaluation Report
Comprehensive deep-dive: security analysis, reliability audit, agent experience review, cost modeling, competitive positioning, and improvement roadmap for asyncpg.
AI-powered analysis · PDF + markdown · Delivered within 30 minutes
Package Brief
Quick verdict, integration guide, cost projections, gotchas with workarounds, and alternatives comparison.
Delivered within 10 minutes
Score Monitoring
Get alerted when this package's AF, security, or reliability scores change significantly. Stay ahead of regressions.
Continuous monitoring
Scores are editorial opinions as of 2026-03-06.