0.4
Part 0 / Foundations · The async patterns the rest of the guide assumes

Async Python: the patterns that show up in every agent loop.

Every code example in this guide uses async Python. async def, await, asyncio.gather — these aren't accidental. They're the shape that agent code takes, because agents spend most of their time waiting on I/O (model calls, tool calls, network requests), and async gives you concurrency without threads. This chapter assumes you can read async Python. Its job is to surface the patterns that matter for agent code specifically — parallel tool dispatch, timeouts, structured cancellation, error isolation, backpressure — and the pitfalls that trip up otherwise-fine code when it scales. Shorter than the other chapters in this part on purpose: you've absorbed most of the patterns by example already. Here they're named.

STEP 1

Why agents are async: the I/O wait pattern.

An agent doing real work spends most of its wall-clock time waiting. Model calls take 1-5 seconds. Tool calls take 100ms-2s. Database queries, web fetches, file reads — all wait. The agent's CPU work (parsing JSON, validating schemas, deciding what to do next) is milliseconds. The runtime cost is overwhelmingly I/O wait.

This is the shape async is designed for. While one operation is waiting on I/O, the event loop can run other operations. You don't get more CPU work done — but you stop blocking on each individual wait. A multi-tool agent turn that does 5 sequential tool calls at 400ms each takes 2 seconds; the same agent dispatching them concurrently takes ~400ms. Same total work; very different wall-clock latency.

The concurrency-vs-parallelism distinction

One source of confusion worth resolving up front. Async Python gives you concurrency (multiple tasks making progress without blocking each other) but not parallelism (multiple tasks executing simultaneously on multiple CPUs). The GIL still applies; Python code runs on one thread at a time.

For agents, this is fine — almost all the time you want to overlap is I/O wait time, where the actual work is happening on someone else's server (the model provider's GPU, the database, the remote API). Your Python process is just waiting for the response. While it waits, the event loop runs other tasks. The concurrency model exactly matches the workload.

The cases where parallelism (true multi-CPU execution) matters for agents are rare: heavy local CPU work like running a large embedding model in-process, or batched local inference. For those, you reach for multiprocessing or accelerated libraries. For everything else (tool dispatch, model calls, retrieval, parallel sub-agents), async concurrency is the right fit.

What async buys you in concrete numbers

An agent that does the following in a single turn:

  • 3 web searches (400ms each in parallel possible)
  • 5 web fetches on returned URLs (600ms each in parallel possible)
  • 1 synthesis model call (3 seconds)

Serial execution: 0.4×3 + 0.6×5 + 3 = 7.2 seconds.

Concurrent execution with async: max(0.4, 0.4, 0.4) + max(0.6, 0.6, 0.6, 0.6, 0.6) + 3 = 0.4 + 0.6 + 3 = 4 seconds.

That's a 45% latency reduction with zero changes to the underlying work, zero added compute cost — just from running independent operations concurrently. For interactive agents where latency directly shapes user experience, this matters; for autonomous agents that run for minutes, it compounds across many turns.

The three async patterns this guide uses

Three patterns appear repeatedly in the rest of the guide. Naming them explicitly:

Pattern 1: parallel tool dispatch. When a model emits multiple tool-use blocks in one turn, dispatch them concurrently via asyncio.gather. Chapter 0.3 introduced this; chapter 2.2 made it a latency lever; chapter 4.4 generalized it to multi-agent coordination.

Pattern 2: bounded sub-agent execution. When the orchestrator spawns sub-agents (chapter 4.3, 4.4), each runs concurrently with a wall-clock timeout. asyncio.wait_for caps individual runtimes; asyncio.gather dispatches them together.

Pattern 3: streaming with concurrent work. The agent streams output to the user (chapter 2.4) while continuing to do background work — generating the response while logging spans, updating dashboards, queueing follow-up tool calls. Async lets these happen in parallel without blocking the stream.

All three are the same underlying technique (asyncio.gather with appropriate error handling); their differences are in what they coordinate.

Question
If async is just concurrency, not parallelism, why use it instead of threads?

Three reasons that matter for agent code specifically. First, async is dramatically lighter weight per "task" — running 100 concurrent operations as coroutines costs trivially more memory than running 5. Threads have real OS overhead per thread. Second, async is debuggable in a way threaded code isn't — the event loop is sequential at any given moment, so there's no thread-interleaving non-determinism. Stack traces and logs read naturally. Third, async composes with the Python ecosystem for HTTP, database, and AI SDKs — all the libraries you'd use in an agent (anthropic, openai, httpx, asyncpg) have async-native APIs.

The case for threads is mostly historical or about CPU-bound work. For agent loops, async is the right default by a wide margin.

Question
Can I mix synchronous and asynchronous code in the same agent?

Yes, with friction. Synchronous calls inside an async function block the event loop — while the sync call runs, no other coroutine makes progress. For an agent doing many concurrent things, this is a serious problem; a 200ms sync database call freezes all 10 of your concurrent sub-agents for 200ms.

The fix: wrap sync calls in asyncio.to_thread so they run in a thread pool without blocking the event loop. await asyncio.to_thread(slow_sync_func, args) gives you back something awaitable. Use this for sync libraries you can't replace and for genuinely-CPU-bound work.

Don't sprinkle sync calls casually into async code without this wrapping — the symptoms are subtle (mysterious latency, mostly-fine until under load) and the fix is mechanical once you know to look.

STEP 2

The four patterns that show up in every agent.

Four async patterns appear in nearly every production agent codebase. Each is short; each has subtleties that matter when you ship.

Pattern 1: parallel dispatch with asyncio.gather

The workhorse. asyncio.gather takes N coroutines and runs them concurrently, returning when all have completed (or one has raised).

# The pattern from chapter 0.3, expanded

async def dispatch_tools(tool_use_blocks):
    # Build a list of coroutines (not running them yet)
    coros = [run_one_tool(block) for block in tool_use_blocks]
    # gather runs them concurrently; returns when all finish
    results = await asyncio.gather(*coros)
    return results

async def run_one_tool(block):
    return await HANDLERS[block.name](**block.input)

Three subtleties:

The list comprehension creates coroutines, not results. [run_one_tool(b) for b in blocks] evaluates each run_one_tool(b) expression, which returns a coroutine object without executing it. The execution happens inside gather. This is correct; it's also the source of confusion when someone thinks "the list comprehension already ran them in parallel."

By default, gather raises on the first exception. If one tool fails, the others get cancelled and the exception propagates. Often not what you want — usually you want all tools to complete (or fail) so the agent can deal with the results individually.

The return_exceptions=True flag changes this. asyncio.gather(*coros, return_exceptions=True) returns results or exception objects in the same list. Exceptions don't propagate; you handle them per-result. This is almost always what you want for tool dispatch — one failed tool shouldn't kill the others.

# The production version

async def dispatch_tools_safe(tool_use_blocks):
    coros = [run_one_tool(block) for block in tool_use_blocks]
    results = await asyncio.gather(*coros, return_exceptions=True)

    # results is a list where some elements may be Exception objects
    tool_results = []
    for block, result in zip(tool_use_blocks, results):
        if isinstance(result, Exception):
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": f"Error: {result}",
                "is_error": True,
            })
        else:
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": result,
            })
    return tool_results

This is the shape every production agent's tool dispatcher takes. The variations are minor — different error formatting, different result post-processing — but the core pattern (gather with exception capture, per-result handling) is stable across agents.

Pattern 2: timeouts with asyncio.wait_for

Any I/O operation should have a timeout. Model calls, tool calls, network requests — all can hang. Without a timeout, your agent waits forever; with a timeout, you get a clean cancellation and a clear error.

# Wrap any awaitable with a timeout

async def run_with_timeout(coro, timeout_s: float):
    try:
        return await asyncio.wait_for(coro, timeout=timeout_s)
    except asyncio.TimeoutError:
        return {"status": "timeout", "elapsed_s": timeout_s}

# Applied to a tool call
result = await run_with_timeout(
    HANDLERS["slow_web_fetch"](url=long_url),
    timeout_s=10.0,
)

Three rules of thumb for picking timeouts:

  • Model calls: 60-120 seconds. Some models with extended thinking can run for a minute or more on hard problems. Set the timeout to 2× the expected P99 latency.
  • Web fetches and external APIs: 10-30 seconds. Anything legitimate happens fast; longer than that and the request is broken.
  • Database / internal services: 1-5 seconds. Internal infra should be fast.

Timeouts cascade: an agent's total run timeout should be larger than the sum of all per-step timeouts (plus some margin). If a single model call can take 60s and your agent does up to 10 of them, your total budget should be at least 700-800 seconds, with retries factored in.

What happens on timeout: the awaited operation is cancelled mid-flight. For pure async code (network requests, asyncio.sleep), this is clean — the operation stops. For operations wrapping sync work in to_thread, cancellation is messier: the thread keeps running, and the cancellation only takes effect when the thread completes. Plan accordingly — don't rely on timeout to actually stop sync work, only to stop your code waiting on it.

Pattern 3: structured concurrency with TaskGroup

Python 3.11+ added asyncio.TaskGroup as the modern replacement for raw gather in many cases. It enforces a stricter discipline: all tasks spawned within the group are guaranteed to complete (or be cancelled) before the async with block exits.

async def run_subagents(task_specs: list):
    results = {}
    async with asyncio.TaskGroup() as tg:
        for spec in task_specs:
            task = tg.create_task(run_subagent(spec))
            results[spec.id] = task

    # Outside the `async with`, every task is guaranteed done
    return {sid: task.result() for sid, task in results.items()}

The benefit of TaskGroup over gather: exceptions are aggregated into an ExceptionGroup rather than killing siblings silently. If two sub-agents fail, you see both exceptions, not just the first. The structured-concurrency discipline also makes the lifecycle clear — you can't accidentally leak a task by forgetting to await it.

For new agent code on Python 3.11+, prefer TaskGroup over raw gather. For codebases that need 3.10 compatibility, raw gather with return_exceptions=True remains the right pattern.

Pattern 4: streaming concurrently with background work

The streaming pattern from chapter 2.4: yield tokens to the client as they arrive, but do other work concurrently (logging, observability, follow-up dispatch).

async def stream_with_logging(messages):
    # Start the model stream
    async with client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=4096,
        messages=messages,
    ) as stream:
        full_text = []
        async for text in stream.text_stream:
            full_text.append(text)
            # Yield to client as tokens arrive
            yield {"type": "token", "text": text}

        # After stream completes, kick off background work
        final = await stream.get_final_message()

        # asyncio.create_task launches without awaiting — the background
        # work runs after the stream caller finishes consuming events
        asyncio.create_task(log_run_async(
            messages=messages,
            response=final,
            tokens=len("".join(full_text)),
        ))

        yield {"type": "done", "usage": final.usage}

The asyncio.create_task pattern lets you fire off work that doesn't need to block the response. Common uses: writing observability spans, updating dashboards, queueing follow-up actions. The caller doesn't wait for the logging to finish; the logging happens on the event loop's own schedule.

The trap: tasks created with create_task can be silently dropped if the event loop ends before they complete. For critical background work (writing audit logs that must persist), gather them explicitly before exiting the agent's context, or use a structured concurrency pattern. For best-effort work (a nice-to-have dashboard update), fire-and-forget is fine.

One specific bug that hits every team eventually: holding a reference to the task is required for it to survive garbage collection. If you write asyncio.create_task(my_coro()) without storing the result, Python may garbage-collect the task before it runs, with no error. The fix: keep the task in a list or set you own. background_tasks.add(task) with task.add_done_callback(background_tasks.discard) is the recommended pattern.

Question
Should I use semaphores to limit concurrency, or is gather enough?

For tool dispatch within a single turn (5-10 concurrent operations), raw gather is fine — the rate limits on your downstream services typically tolerate that level of concurrency comfortably. For larger fan-out (50+ concurrent operations against rate-limited APIs), you need a semaphore to cap concurrency.

The pattern:

sem = asyncio.Semaphore(10)  # max 10 concurrent

async def bounded(coro):
    async with sem:
        return await coro

results = await asyncio.gather(*[bounded(c) for c in coros])

The semaphore caps the number of operations actually running at any moment; the rest wait. Use this when your downstream has rate limits you'd otherwise blow through.

Question
When does asyncio.run vs creating an event loop manually matter?

For most agent code: asyncio.run(main()) at your entry point and nothing else. It creates the event loop, runs your coroutine, cleans up. This is the right default.

Manual loop management (asyncio.get_event_loop(), loop.run_until_complete) was the older API and has subtle gotchas. Reach for it only when integrating async code into a synchronous framework that owns its own event loop (some web frameworks, some testing setups). Otherwise, asyncio.run is sufficient.

STEP 3

The five pitfalls that bite every team.

Async Python has well-known pitfalls — patterns that pass code review, work in tests, and break under production load. Five of them affect agent code specifically often enough to be worth naming.

Pitfall 1: forgetting await

The error: calling an async function without awaiting its result.

# Bug — calls the function but doesn't wait for the result
result = run_subagent(spec)        # result is a coroutine, not the answer
process(result)                    # processing a coroutine — bug

# Correct
result = await run_subagent(spec)  # result is the answer
process(result)

Modern type checkers (mypy, pyright) and IDE warnings catch this most of the time. Without them, the failure mode is a RuntimeWarning that's easy to miss, and downstream code processing a coroutine object instead of a result. Run type-checking; it's not optional in async-heavy code.

Pitfall 2: serializing what should be parallel

Looking parallel but actually serial:

# Bug — these run sequentially even though they look parallel
results = []
for spec in task_specs:
    result = await run_subagent(spec)   # awaits each before continuing
    results.append(result)

# Correct — actually concurrent
results = await asyncio.gather(*[
    run_subagent(spec) for spec in task_specs
])

The first version is async but sequential — each await blocks before the next iteration starts. You get all the overhead of async with none of the concurrency benefit. The second version dispatches all work concurrently. The wall-clock difference for 5 sub-agents at 3 seconds each: 15 seconds vs ~3 seconds.

The pattern to internalize: if work is independent, batch it into gather; if it's sequential, await in order. Don't accidentally serialize independent work by awaiting inside a loop.

Pitfall 3: blocking the event loop with sync work

A synchronous call inside an async function blocks the entire event loop:

async def process_document(doc_path: str):
    # Bug — open() and read() are sync; they block the event loop
    with open(doc_path) as f:
        content = f.read()    # blocks; nothing else makes progress
    return await process_content_async(content)

# Better — wrap sync I/O to run in a thread
async def process_document(doc_path: str):
    content = await asyncio.to_thread(
        lambda: open(doc_path).read()
    )
    return await process_content_async(content)

# Best — use an async-native library (aiofiles, etc.)
async def process_document(doc_path: str):
    async with aiofiles.open(doc_path) as f:
        content = await f.read()
    return await process_content_async(content)

The symptoms of accidental blocking: agent latency that scales with concurrent load (because each agent's sync call blocks all the others), or mysterious slowdowns under traffic spikes. The diagnostic: add logging to your event-loop iterations and watch for long gaps. The fix is mechanical once identified.

Common sync calls to watch for: file I/O without aiofiles, sync database libraries (psycopg2 instead of asyncpg), CPU-bound work like JSON parsing of huge documents (use aiojson or move to thread), and any third-party library that wasn't written with async in mind.

Pitfall 4: cancellation that doesn't actually cancel

You wrap an operation in wait_for with a timeout. The timeout fires. You expect the operation to stop. It doesn't:

# The operation wraps sync work in a thread
async def slow_op():
    await asyncio.to_thread(very_long_sync_function)

# Timeout fires after 5s, but the thread keeps running
try:
    await asyncio.wait_for(slow_op(), timeout=5.0)
except asyncio.TimeoutError:
    pass
# The sync function is still running in the thread pool!

Cancellation only works for genuinely-async operations. asyncio.sleep, network I/O via async libraries, and other awaitables all cancel cleanly. Sync work running in a thread does not — Python threads can't be safely killed from outside, so the thread keeps running to completion. Your async code returns control on timeout, but the underlying work continues.

The practical implications: don't rely on timeout to actually stop sync work; only to stop your code waiting on it. If the operation has external side effects (writing files, calling APIs), those still happen even after the timeout. Plan around this — either accept that "timeout" means "I stopped waiting, the work continues" or move the work into a true async implementation.

Pitfall 5: silently swallowed exceptions in fire-and-forget tasks

Tasks created with create_task that raise exceptions: the exception is captured by the task object but never re-raised unless someone awaits the task. If you never await, the exception is silently lost.

# Bug — exceptions in this task disappear silently
asyncio.create_task(log_async(message))

# Better — at minimum, log the exception when it happens
def _log_exception(task):
    if task.exception():
        logger.error("Background task failed", exc_info=task.exception())

t = asyncio.create_task(log_async(message))
t.add_done_callback(_log_exception)

The add_done_callback pattern at least surfaces the failure to your logs. For critical background work, the better fix is to await the task explicitly before exiting the agent's context (or use TaskGroup, which raises exceptions from completed tasks). The "silent failure of background tasks" pattern is responsible for many "but the dashboard isn't updating" mysteries in production.

STEP 4

Advanced patterns: cancellation, backpressure, async iteration.

Three patterns beyond the basics that show up in mature agent code. Skim these for awareness; reach for them when the basic patterns don't suffice.

Cooperative cancellation: shielding critical work

When your agent run is cancelled (a timeout fires, the user aborts), you usually want everything to stop cleanly. Sometimes, though, there's critical cleanup work that must complete even as cancellation propagates — flushing a final log entry, releasing a lock, committing a partial transaction.

asyncio.shield protects a coroutine from outer cancellation:

async def run_with_cleanup(work_coro):
    try:
        return await work_coro
    finally:
        # This cleanup runs even if work_coro was cancelled,
        # and shield protects the cleanup from also being cancelled
        await asyncio.shield(write_final_audit_log())

Use shield sparingly — it can extend cancellation latency significantly if abused. The right use case is brief, bounded cleanup work that must complete; for everything else, normal cancellation is the right behavior.

Backpressure: controlled flow when downstream is slow

If your agent produces output faster than it can be consumed (a streaming response that the network can't deliver fast enough), naive code accumulates in memory. asyncio.Queue with a bounded size lets the producer wait when the consumer falls behind:

async def produce_with_backpressure(queue: asyncio.Queue):
    async for chunk in stream.text_stream:
        # If queue is full (consumer slow), put() blocks here
        await queue.put(chunk)
    await queue.put(None)   # sentinel for end-of-stream

async def consume_with_pacing(queue: asyncio.Queue):
    while True:
        chunk = await queue.get()
        if chunk is None: break
        await send_to_client(chunk)

# Bounded queue applies backpressure when consumer is slow
q = asyncio.Queue(maxsize=10)
await asyncio.gather(produce_with_backpressure(q), consume_with_pacing(q))

This pattern matters for streaming agents serving slow clients. Without backpressure, a slow client (or a stuck network) causes memory growth as your producer races ahead. With backpressure, the producer naturally paces itself to consumer speed.

Async iteration: streaming agent loops

The streaming-agent pattern from chapter 2.4 uses async generators (async def + yield) to emit events as they happen:

async def stream_agent_events(messages):
    for step in range(MAX_STEPS):
        # Stream the model response, yielding tokens as they arrive
        async with client.messages.stream(...) as stream:
            async for text in stream.text_stream:
                yield {"type": "token", "text": text}

        final = await stream.get_final_message()
        if final.stop_reason != "tool_use":
            yield {"type": "done"}
            return

        # Tool dispatch, yielding progress events
        yield {"type": "tool_use", "blocks": get_tool_blocks(final)}
        results = await dispatch_tools_safe(get_tool_blocks(final))
        yield {"type": "tool_results", "results": results}

        messages.append(...)   # update conversation, loop

# Callers iterate
async for event in stream_agent_events(messages):
    handle_event(event)

The async-generator pattern is what makes streaming agent loops clean. Each yield emits one event to the caller; the caller processes it; the loop continues. The producer (agent) and consumer (client renderer) decouple naturally, and the code reads sequentially even though it's running asynchronously underneath.

This is the pattern the Agent SDK's query() function exposes, the pattern Anthropic's streaming endpoints support, and the pattern your own streaming agents should adopt. It's a meaningful upgrade over "return a final result" once you have any user-facing latency to manage.

If you remember one thing from this chapter, remember return_exceptions=True on asyncio.gather. It's the single highest-leverage async pattern for agents — without it, one flaky tool can kill an entire turn's worth of dispatched work. With it, you handle each result on its own merits. Apply it everywhere you dispatch independent work.

End of chapter 0.4

Deliverable

A working command of the async Python patterns that appear in every agent codebase. Parallel dispatch with gather (and return_exceptions=True for production), timeouts with wait_for, structured concurrency with TaskGroup on 3.11+, streaming with async generators. Awareness of the five pitfalls (missing await, accidental serialization, blocking the loop, broken cancellation, silent task failures) and the diagnostics for each. Advanced patterns (shielding, backpressure, async iteration) as tools to reach for when basic patterns aren't enough.

  • Tool dispatch uses asyncio.gather(*coros, return_exceptions=True)
  • Every model call, tool call, network request has a timeout via asyncio.wait_for
  • Per-step timeouts ladder up to a total agent-run timeout with margin for retries
  • Python 3.11+: prefer TaskGroup for new code; falls back to gather for older versions
  • Sync I/O wrapped in asyncio.to_thread or replaced with async-native libraries
  • Fire-and-forget tasks hold references in a set to avoid GC; add done callback for error logging
  • Type checking (mypy/pyright) enabled to catch missing awaits and other async mistakes
  • Streaming agent loops use async def + yield; callers iterate with async for
  • Backpressure via bounded queues for streaming pipelines with variable consumer speed
  • Awareness that timeout cancels your wait, not necessarily the underlying sync work