Python Asyncio Patterns: From Callbacks to Structured Concurrency
Learn how to write high-performance async Python code using asyncio — covering coroutines, task groups, async context managers, and real-world HTTP client patterns.
Python’s asyncio library enables cooperative multitasking — a single thread efficiently handles thousands of concurrent I/O-bound operations. This guide progresses from the basics to production patterns.
The Event Loop Model
Unlike threading (OS-managed), asyncio uses a single-threaded event loop that switches tasks only at await points. No GIL contention, minimal context-switch overhead.
import asyncio
async def greet(name: str, delay: float) -> str:
await asyncio.sleep(delay) # Yields control; event loop runs other tasks
return f"Hello, {name}!"
async def main() -> None:
# Run both concurrently — total time ~1s, not 2s
results = await asyncio.gather(
greet("Alice", 1.0),
greet("Bob", 0.8),
)
print(results) # ['Hello, Alice!', 'Hello, Bob!']
asyncio.run(main())
Task Groups (Python 3.11+)
TaskGroup is the modern, structured-concurrency replacement for gather. It cancels sibling tasks automatically if one raises an exception.
import asyncio
import httpx
async def fetch(client: httpx.AsyncClient, url: str) -> dict:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return response.json()
async def fetch_all(urls: list[str]) -> list[dict]:
results: list[dict] = []
async with httpx.AsyncClient() as client:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(client, url)) for url in urls]
# All tasks finished (or one raised, cancelling siblings)
return [t.result() for t in tasks]
Async Context Managers
Any class implementing __aenter__ / __aexit__ works as an async context manager — perfect for managing async resources like database connections.
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_connection(dsn: str):
conn = await create_db_connection(dsn) # hypothetical async connect
try:
yield conn
finally:
await conn.close()
async def main():
async with managed_connection("postgresql://localhost/mydb") as conn:
rows = await conn.fetch("SELECT id, name FROM users LIMIT 10")
for row in rows:
print(row["id"], row["name"])
Producer / Consumer with asyncio.Queue
Queues decouple producers from consumers, enabling backpressure control.
import asyncio
import random
async def producer(queue: asyncio.Queue[int], n: int) -> None:
for i in range(n):
item = random.randint(1, 100)
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(0.1)
# Signal completion with sentinel
await queue.put(-1)
async def consumer(queue: asyncio.Queue[int]) -> None:
total = 0
while True:
item = await queue.get()
if item == -1:
break
total += item
queue.task_done()
print(f"Consumer total: {total}")
async def main() -> None:
queue: asyncio.Queue[int] = asyncio.Queue(maxsize=5) # Backpressure at 5 items
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(queue, 10))
tg.create_task(consumer(queue))
asyncio.run(main())
Common Pitfalls
# ✗ WRONG — blocks the entire event loop
import time
async def bad():
time.sleep(2) # Blocking call inside async function!
# ✓ CORRECT — run blocking calls in a thread pool
async def good():
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, time.sleep, 2)
# ✗ WRONG — fire-and-forget loses exceptions silently
async def fire_and_forget():
asyncio.create_task(risky_coro()) # Exception disappears into the void
# ✓ CORRECT — always await tasks or attach exception handlers
async def safe():
task = asyncio.create_task(risky_coro())
task.add_done_callback(lambda t: t.exception() and print(f"Error: {t.exception()}"))
Performance Comparison
import asyncio
import time
async def async_benchmark(n: int) -> float:
start = time.perf_counter()
await asyncio.gather(*[asyncio.sleep(0.01) for _ in range(n)])
return time.perf_counter() - start
# 1000 concurrent "tasks" (each 10ms) completes in ~10ms total
# Sequential would take 10 seconds
print(asyncio.run(async_benchmark(1000))) # ~0.010s
Asyncio shines for I/O-bound workloads. For CPU-bound work, use multiprocessing or ProcessPoolExecutor instead.