2.1
Part II / Ship · The chapter that makes the next three possible

Observability: every run leaves a trace you can read.

An agent that fails in production for an unknown reason is an agent you can't ship. This chapter wires every model call, tool call, and retrieval into a span tree following the OpenTelemetry GenAI conventions; sets up structured logging that survives asyncio.gather; and gives you a replay CLI that takes a trace ID and re-runs the agent locally with the exact same tool results. The first time you debug a production failure in five minutes instead of five hours, you'll know the chapter earned its keep.

STEP 1

Why print() stops working.

You've been debugging your agent with print() statements scattered through the loop. That worked while you were the only user, running one request at a time, watching the terminal scroll. It stops working the instant any of those conditions changes.

Let me walk you through a specific failure pattern, because the motivation is concrete. Your agent is deployed. A user files a ticket: "I asked it about my Q3 invoices and it told me about Q1." You go look at your logs. What you find:

[2026-03-14 10:42:13] INFO  request received: "show me Q3 invoices"
[2026-03-14 10:42:13] INFO  calling search_docs
[2026-03-14 10:42:14] INFO  search_docs returned 5 results
[2026-03-14 10:42:14] INFO  calling search_docs
[2026-03-14 10:42:15] INFO  search_docs returned 3 results
[2026-03-14 10:42:16] INFO  calling answer
[2026-03-14 10:42:18] INFO  request complete (5.2s)
[2026-03-14 10:42:13] INFO  request received: "where do I update billing"
[2026-03-14 10:42:14] INFO  calling fetch_doc
[2026-03-14 10:42:14] INFO  fetch_doc returned
[2026-03-14 10:42:15] INFO  calling answer
[2026-03-14 10:42:16] INFO  request complete (3.1s)

Read those lines carefully. There are at least two concurrent requests interleaved. You cannot tell which search_docs call belonged to which user. You cannot see what query was actually passed to the tool, what results came back, what the model decided to do next, or what final answer the user got. You have timestamps and tool names. That's it.

This is the production-debugging cliff. Single-user, sequential, REPL-driven development gives you everything you need by reading scrollback. The moment you have concurrent requests, an async tool layer, and an unhappy user pointing at a specific failure, you need three things print() doesn't give you:

  • Correlation. Every log line must say which request it belongs to.
  • Structure. "Calling search_docs" isn't useful; "search_docs(query='Q3 invoices', filter='date>=2025-07-01')" is.
  • Hierarchy. Calls happen inside calls. The agent loop contains the model call, which decided to make a tool call, which made a database query. That nesting is essential context.

The span tree as the right data model

The data structure that gives you all three is a span tree. A span represents one operation — a model call, a tool call, a retrieval, an agent run. It has a start time, end time, a unique ID, a parent span's ID (or none, if it's the root), a name, and a bag of attributes describing what happened. Spans nest: a child span starts after its parent and ends before it.

For a single agent run, the tree looks like this:

invoke_agent "user asks about Q3 invoices" ┐ ├─ chat claude-sonnet-4-5 3 turns │ parent span │ input: 2,341 tok out: 412 tok │ │ ├─ execute_tool search_docs │ │ │ args: {query: "Q3 invoices"} │ │ │ result: 5 chunks │ │ ├─ execute_tool search_docs │ │ │ args: {query: "Q3 2025"} │ │ │ result: 3 chunks │ │ └─ chat claude-sonnet-4-5 synthesis │ │ output: "Your Q3 invoices..." │ └─ duration: 5.2s tokens: 2,753 cost: $0.034 ┘

From a span tree like this you can answer: which tools did the agent use? With what arguments? In what order? Did any fail? How long did each take? How many tokens did each model call consume? What was the final answer? You can answer all of that in seconds, for any past request, by looking up its trace ID.

The OpenTelemetry GenAI conventions

This isn't a new invention. OpenTelemetry — the observability standard that already underlies most production tracing — has a working group dedicated to LLM and agent semantics, and they've published a vocabulary. As of 2026 these conventions are still marked experimental, but the field names are stabilizing and major vendors (Datadog, Grafana, Langfuse, Braintrust) all support them. Using these names instead of inventing your own means your traces work with off-the-shelf tooling.

The core span names and attributes you'll use in the rest of this chapter:

Span name
When to emit
Key attributes
invoke_agent
Once per agent run (top-level)
gen_ai.agent.name, gen_ai.conversation.id, user.id
chat
Each model call
gen_ai.request.model, gen_ai.usage.input_tokens, gen_ai.usage.output_tokens, gen_ai.response.finish_reasons
execute_tool
Each tool dispatch
gen_ai.tool.name, gen_ai.tool.call.id, plus your own args/result

Spans also carry conversation content when you opt in: gen_ai.system_instructions, gen_ai.input.messages, and gen_ai.output.messages. These are big and contain user data, so they're recorded conditionally — more on that below.

You don't need an OpenTelemetry collector or a vendor account to start. The data model is the unlock — once your agent produces spans with these names and attributes, you can ship them anywhere later: a JSONL file, Langfuse, Datadog, your own Postgres. Pick the data model first, the destination second.

Question
My agent runs synchronously. Single user, local CLI. Do I really need spans?

If you'll never run it concurrently, never deploy it, and never want to debug a failure that happened ten minutes ago — no, prints are fine. But that's a short list. The moment you put a FastAPI handler in front of the agent (Chapter 2.4) or run an eval suite that fires off 50 trajectories in parallel (Chapter 1.4), interleaved logs become unreadable. Spans cost nothing to add now; they cost a lot to add later when you've got code paths that didn't think about correlation.

Question
Why "spans" and not just structured JSON logs?

Spans are structured JSON logs, with two pieces of structure that ordinary logs lack: a duration (start/end time pair) and a parent pointer. Those two fields are what lets you render the tree view above. Without them you have a flat stream of events; with them you have a hierarchy you can navigate. Modern observability tools (Langfuse, Braintrust, Datadog, etc.) require this shape because their UIs are tree views.

STEP 2

Instrument the loop.

Now the engineering. We'll add tracing to the agent loop from Chapter 1.1, using the OpenTelemetry SDK directly. The pattern is the same regardless of where the spans end up — at the end of this step we'll point them at a JSONL file for local development; at the end of the chapter you can swap the exporter for Langfuse, Braintrust, or any OTLP-compatible backend with zero code changes.

The minimal setup

Three pieces: a tracer (gives you spans), an exporter (sends them somewhere), and a processor (batches them efficiently). One-time configuration at app startup:

# obs/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.resources import Resource

def init_tracing(service_name: str = "my-agent"):
    resource = Resource.create({"service.name": service_name})
    provider = TracerProvider(resource=resource)

    # For now: dump spans to stdout as JSON. Swap for OTLPSpanExporter
    # pointed at Langfuse / Braintrust / Datadog later — zero other changes.
    provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
    trace.set_tracer_provider(provider)

tracer = trace.get_tracer("agent")

Wrap the agent loop

Now we wrap the three things that matter: the top-level run, each model call, and each tool dispatch. Compare this to the bare loop from Chapter 1.1 — the only difference is the with tracer.start_as_current_span(...) wrappers and the attribute setting.

# agent/loop.py — instrumented version
from opentelemetry import trace
from anthropic import Anthropic
from uuid import uuid4

tracer = trace.get_tracer("agent")
client = Anthropic()

async def run_agent(user_msg: str, user_id: str):
    with tracer.start_as_current_span("invoke_agent") as root:
        root.set_attribute("gen_ai.agent.name", "research-assistant")
        root.set_attribute("gen_ai.conversation.id", str(uuid4()))
        root.set_attribute("user.id", user_id)

        messages = [{"role": "user", "content": user_msg}]
        for step in range(20):  # budget
            response = await _chat(messages)
            messages.append({"role": "assistant",
                             "content": response.content})

            if response.stop_reason == "end_turn":
                root.set_attribute("gen_ai.response.finish_reasons",
                                   ["end_turn"])
                return response.content[0].text

            tool_results = []
            for block in response.content:
                if block.type == "tool_use":
                    result = await _dispatch_tool(block)
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": result,
                    })
            messages.append({"role": "user", "content": tool_results})

async def _chat(messages):
    with tracer.start_as_current_span("chat") as span:
        span.set_attribute("gen_ai.request.model", "claude-sonnet-4-5")
        span.set_attribute("gen_ai.provider.name", "anthropic")
        response = client.messages.create(
            model="claude-sonnet-4-5",
            max_tokens=4096,
            tools=TOOLS,
            messages=messages,
        )
        span.set_attribute("gen_ai.usage.input_tokens",
                           response.usage.input_tokens)
        span.set_attribute("gen_ai.usage.output_tokens",
                           response.usage.output_tokens)
        span.set_attribute("gen_ai.response.finish_reasons",
                           [response.stop_reason])
        return response

async def _dispatch_tool(block):
    with tracer.start_as_current_span("execute_tool") as span:
        span.set_attribute("gen_ai.tool.name", block.name)
        span.set_attribute("gen_ai.tool.call.id", block.id)
        span.set_attribute("tool.args", json.dumps(block.input))
        try:
            result = await HANDLERS[block.name](**block.input)
            span.set_attribute("tool.result_size", len(str(result)))
            return result
        except Exception as e:
            span.set_attribute("error.type", type(e).__name__)
            span.record_exception(e)
            raise
# agent/loop.py — instrumented version
from opentelemetry import trace
from openai import OpenAI
from uuid import uuid4

tracer = trace.get_tracer("agent")
client = OpenAI()

async def run_agent(user_msg: str, user_id: str):
    with tracer.start_as_current_span("invoke_agent") as root:
        root.set_attribute("gen_ai.agent.name", "research-assistant")
        root.set_attribute("gen_ai.conversation.id", str(uuid4()))
        root.set_attribute("user.id", user_id)

        # Responses API maintains state via previous_response_id
        prev_id = None
        current_input = user_msg
        for step in range(20):
            response = await _chat(current_input, prev_id)
            prev_id = response.id

            tool_calls = [item for item in response.output
                          if item.type == "function_call"]
            if not tool_calls:
                root.set_attribute("gen_ai.response.finish_reasons",
                                   ["stop"])
                return response.output_text

            results = []
            for call in tool_calls:
                result = await _dispatch_tool(call)
                results.append({
                    "type": "function_call_output",
                    "call_id": call.call_id,
                    "output": str(result),
                })
            current_input = results

async def _chat(input_data, previous_response_id):
    with tracer.start_as_current_span("chat") as span:
        span.set_attribute("gen_ai.request.model", "gpt-5.5")
        span.set_attribute("gen_ai.provider.name", "openai")
        response = client.responses.create(
            model="gpt-5.5",
            tools=TOOLS,
            input=input_data,
            previous_response_id=previous_response_id,
        )
        span.set_attribute("gen_ai.usage.input_tokens",
                           response.usage.input_tokens)
        span.set_attribute("gen_ai.usage.output_tokens",
                           response.usage.output_tokens)
        return response

async def _dispatch_tool(call):
    with tracer.start_as_current_span("execute_tool") as span:
        span.set_attribute("gen_ai.tool.name", call.name)
        span.set_attribute("gen_ai.tool.call.id", call.call_id)
        args = json.loads(call.arguments)
        span.set_attribute("tool.args", call.arguments)
        try:
            result = await HANDLERS[call.name](**args)
            span.set_attribute("tool.result_size", len(str(result)))
            return result
        except Exception as e:
            span.set_attribute("error.type", type(e).__name__)
            span.record_exception(e)
            raise

What you get

Run a query through it and you get a stream of JSON spans on stdout. Pretty-printed and arranged in tree order:

{
  "name": "invoke_agent",
  "trace_id": "8f3c2e1a9b4d5067...",
  "span_id": "a0b1c2d3...",
  "parent_span_id": null,
  "start_time": "2026-03-14T10:42:13.412Z",
  "end_time":   "2026-03-14T10:42:18.673Z",
  "attributes": {
    "gen_ai.agent.name": "research-assistant",
    "gen_ai.conversation.id": "4e7a-9c2f-...",
    "user.id": "u_8821",
    "gen_ai.response.finish_reasons": ["end_turn"]
  }
}
  {
    "name": "chat",
    "parent_span_id": "a0b1c2d3...",
    "start_time": "10:42:13.420Z",
    "end_time":   "10:42:14.108Z",
    "attributes": {
      "gen_ai.request.model": "claude-sonnet-4-5",
      "gen_ai.provider.name": "anthropic",
      "gen_ai.usage.input_tokens": 2341,
      "gen_ai.usage.output_tokens": 89,
      "gen_ai.response.finish_reasons": ["tool_use"]
    }
  }
    {
      "name": "execute_tool",
      "parent_span_id": "a0b1c2d3..." (sibling of chat),
      "attributes": {
        "gen_ai.tool.name": "search_docs",
        "gen_ai.tool.call.id": "toolu_01ABC...",
        "tool.args": "{\"query\": \"Q3 invoices\"}",
        "tool.result_size": 1842
      }
    }
    ...
What changed from the print-based world

Every line that used to be a print is now an attribute on a span. The span's parent pointer encodes the call hierarchy. The trace ID links every span back to its originating request — concurrent requests can't be confused because each gets its own trace ID at the top.

The tree is the unlock. Once you have it, every observability tool on the market — Langfuse, Braintrust, Datadog, Honeycomb, Grafana — can ingest it and give you a UI. You can also dump it to JSONL and query it with jq. The data is portable.

Pointing it at a vendor

Once your code emits spans, sending them to a vendor is a one-line change: swap the exporter. To use Langfuse (free tier, designed for LLM apps):

# obs/tracing.py — Langfuse variant
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
    OTLPSpanExporter
)

def init_tracing(service_name: str = "my-agent"):
    resource = Resource.create({"service.name": service_name})
    provider = TracerProvider(resource=resource)

    # Replace ConsoleSpanExporter with OTLP pointed at Langfuse
    exporter = OTLPSpanExporter(
        endpoint="https://us.cloud.langfuse.com/api/public/otel/v1/traces",
        headers={"Authorization": f"Bearer {os.environ['LANGFUSE_KEY']}"},
    )
    provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(provider)

Braintrust, Datadog, Honeycomb, and any other OTLP-compatible backend differ only in URL and auth header. The agent code is unchanged.

What about content?

You'll notice I haven't put the prompts or completions on the spans. That's deliberate — they're large, they often contain user data, and you usually don't want every retrieved document text echoed into your observability storage. The convention provides gen_ai.input.messages and gen_ai.output.messages for this, but the recommendation is to opt in conditionally:

# Only record full content for sampled traces (1%) or for traces
# flagged for inspection. Cheap to add per-call.

CAPTURE_CONTENT = os.environ.get("CAPTURE_CONTENT") == "1"

def should_capture(trace_id: str) -> bool:
    if CAPTURE_CONTENT: return True
    # Sample 1% by trace_id hash for ambient observability
    return int(trace_id[:8], 16) % 100 == 0

if should_capture(span.context.trace_id):
    span.set_attribute("gen_ai.input.messages",
                       json.dumps(messages)[:10000])
    span.set_attribute("gen_ai.output.messages",
                       json.dumps([b.model_dump() for b in response.content]))

If you're handling regulated data — health, finance, EU personal data — talk to your privacy team before turning on full-content capture. Traces are logs; logs are storage; storage has regulatory implications. The 1% sampling pattern above is the production default for a reason.

Question
Won't all this tracing slow my agent down?

In practice, no. The OpenTelemetry SDK uses an asynchronous batch processor — spans get queued in memory and flushed to the exporter in the background. The hot path overhead is on the order of microseconds per span; even an agent that emits 20 spans per run pays single-digit milliseconds. Compared to a single LLM call at 800ms+, the tracing cost is invisible.

Where you can hurt yourself: serializing huge prompt/completion content as attributes on every span. That's why content capture is conditional. The structural attributes (model name, token counts, tool name) are tiny and always-on.

Question
Which vendor should I pick — Langfuse, Braintrust, Datadog, something else?

Honest answer: it doesn't matter as much as it feels like it does, because the OpenTelemetry data model makes them interchangeable. My rough heuristic:

  • Langfuse — best free tier, LLM-native UI, easiest to start. Pick this if you're a small team and your stack is "just LLMs."
  • Braintrust — best evals integration, opinionated about prompt-engineering workflow. Pick this if Chapter 1.4 evals are a daily activity for your team.
  • Datadog / Honeycomb / Grafana — pick whichever your company already runs. The LLM-specific affordances are catching up to the LLM-native tools quickly; the value of one unified pane is high.
  • Roll your own (just JSONL → SQLite) — viable for personal projects, and the right answer if you want to understand what these tools do. Stops being viable once a teammate needs to see traces too.

The lock-in risk is low because the data model is shared. Migrating from one to another is a config change, not a rewrite.

STEP 3

Logs that survive asyncio.gather.

Spans cover the structured story: which operations happened, in what order, with what attributes. You still need ordinary log lines for the unstructured story — error messages, debug context, intermediate values you didn't decide deserved an attribute. The problem is that ordinary Python logging loses the connection to the trace tree as soon as you cross an await boundary inside asyncio.gather.

Concretely, this fails in a way that's hard to spot:

# Naive logging — what NOT to do
import logging
log = logging.getLogger("agent")

async def fan_out_retrieval(queries):
    # Logs from these concurrent calls interleave with no way
    # to tell which log belongs to which query.
    return await asyncio.gather(*[search(q) for q in queries])

async def search(query):
    log.info(f"searching for {query}")
    results = await bm25.search(query)
    log.info(f"found {len(results)} results")
    return results
# Log output during a concurrent run:
INFO:agent:searching for Q3 invoices
INFO:agent:searching for billing settings
INFO:agent:found 12 results
INFO:agent:searching for refund policy
INFO:agent:found 3 results
INFO:agent:found 8 results

Which result count belongs to which query? You cannot tell from the log. This is the same correlation problem from Step 1, but now within a single trace. The fix is to propagate context through contextvars — Python's mechanism for variables that follow the async task instead of the thread — and to bind that context onto every log record.

The pattern

structlog + contextvars gives you the cleanest version. About fifteen lines of setup and every subsequent log line carries the trace context for free:

# obs/logging.py
import structlog, logging, sys
from opentelemetry import trace

def _add_trace_context(logger, method_name, event_dict):
    """Pull current trace IDs from OpenTelemetry into log record."""
    span = trace.get_current_span()
    ctx = span.get_span_context()
    if ctx.is_valid:
        event_dict["trace_id"] = format(ctx.trace_id, "032x")
        event_dict["span_id"] = format(ctx.span_id, "016x")
    return event_dict

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,  # pull bound vars
        _add_trace_context,                       # pull OTel IDs
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.stdlib.BoundLogger,
    logger_factory=structlog.stdlib.LoggerFactory(),
)
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

log = structlog.get_logger("agent")

Two things make this work:

  • merge_contextvars — pulls in any variable bound via structlog.contextvars.bind_contextvars(...). Bindings are scoped to the current async task: asyncio.gather gives each coroutine its own context, so bindings don't leak between concurrent calls.
  • _add_trace_context — reads the current OpenTelemetry span ID at log-emit time. Every log line gets the trace and span ID of whatever span is active when the log fires, automatically.

Use it

Where before you wrote:

log.info(f"searching for {query}")  # f-string concatenation

Now you write:

log.info("searching", query=query, top_k=5)

And the output, for a concurrent run, becomes:

{"timestamp":"2026-03-14T10:42:13.421Z","level":"info","event":"searching",
 "query":"Q3 invoices","top_k":5,
 "trace_id":"8f3c2e1a9b4d50670c8...","span_id":"a0b1c2d3e4f50678"}
{"timestamp":"2026-03-14T10:42:13.422Z","level":"info","event":"searching",
 "query":"billing settings","top_k":5,
 "trace_id":"8f3c2e1a9b4d50670c8...","span_id":"b1c2d3e4f5067890"}
{"timestamp":"2026-03-14T10:42:13.890Z","level":"info","event":"found",
 "query":"Q3 invoices","count":12,
 "trace_id":"8f3c2e1a9b4d50670c8...","span_id":"a0b1c2d3e4f50678"}
{"timestamp":"2026-03-14T10:42:13.894Z","level":"info","event":"found",
 "query":"billing settings","count":8,
 "trace_id":"8f3c2e1a9b4d50670c8...","span_id":"b1c2d3e4f5067890"}

Now you can answer "which result count belonged to which query" in two ways: by the explicit query field, or by matching span_id. Both work. The latter generalizes to logs that don't happen to repeat the query string.

Binding higher-level context

At the top of your agent run, bind user_id and any other identifiers you'll want on every subsequent log line:

from structlog.contextvars import bind_contextvars, clear_contextvars

async def run_agent(user_msg: str, user_id: str):
    clear_contextvars()
    bind_contextvars(user_id=user_id, agent="research-assistant")

    with tracer.start_as_current_span("invoke_agent"):
        log.info("agent_start", user_msg_len=len(user_msg))
        # ... agent loop ...
        log.info("agent_done", duration_s=elapsed)

Every log line emitted anywhere inside run_agent — including from tool handlers, retrieval code, anything you call — gets the bound user_id automatically, courtesy of contextvars. And because contextvars are async-task-scoped, two concurrent run_agent calls don't trample each other's bindings.

Search and aggregate

Once logs are structured JSON with trace IDs, two things become trivial:

  • Drill into one request. jq 'select(.trace_id=="8f3c...")' app.log dumps every log line from that one request. Combine with the span tree from the previous step and you have a full reconstruction.
  • Aggregate across requests. jq -s 'group_by(.event) | map({event:.[0].event, count:length})' app.log tells you which events fire most. Useful for spotting high-frequency error events.

Most observability vendors also ingest structured logs and correlate them with traces automatically — Langfuse, Datadog, Honeycomb all do this when both signals share a trace ID. You don't have to choose between traces and logs; you use both, and they cross-reference.

If you only do one thing from this step: replace every f"..."-style log message with log.info("event_name", field=value, ...). The discipline of "every log is a typed event with named fields" pays back for years. The async-correlation piece is a bonus.

Question
I'm already using the standard logging module everywhere. Can I keep it?

Yes. structlog wraps logging rather than replacing it; it sits in front of the standard handlers and reformats. Existing logging.getLogger("foo").info("bar") calls keep working — they just produce plain strings instead of structured records. You can migrate file by file.

The minimum useful step: don't migrate the whole codebase. Migrate your agent loop and tool handlers — the places where correlation matters most — and leave the rest alone.

Question
Why contextvars and not just thread-local storage?

Thread-local storage is per OS thread; asyncio runs many tasks on one thread. A binding set in task A would be visible to task B if they share a thread — exactly the correlation bug we're trying to avoid. contextvars are per async task, so each asyncio.gather child gets its own independent context. This was the whole point of PEP 567.

STEP 4

Replay: run the failure on your laptop.

You now have a trace tree for every production run, and structured logs that correlate to it. That gets you 80% of the way through most debugging: read the trace, see the bad tool call, identify the bad result. But for the other 20% — the bugs where you need to step through the agent's behavior, change one prompt, see what would have happened differently — you need replay.

Replay means: take a trace ID from production, re-run the agent on the same input, but instead of calling tools for real, return the exact tool results that were recorded during the original run. The model sees the same context it saw before, runs through its loop the same way, and you can pause, mutate, or re-prompt at any step. It is the agentic-AI equivalent of a debugger.

The two-piece architecture

To make replay work you need to persist tool I/O during the original run, then teach your tool dispatcher to look up recorded results when in replay mode. About forty lines of code.

# obs/replay.py
import json, contextvars
from pathlib import Path

REPLAY_DIR = Path("runs/replay")
REPLAY_DIR.mkdir(parents=True, exist_ok=True)

# When non-None, _dispatch_tool returns recorded results instead of
# calling handlers. Set by the replay CLI.
replay_mode: contextvars.ContextVar = contextvars.ContextVar(
    "replay_mode", default=None
)

def recorder_path(trace_id: str) -> Path:
    return REPLAY_DIR / f"{trace_id}.jsonl"

def record(trace_id: str, event: dict):
    """Append a tool I/O event to this trace's recording."""
    with recorder_path(trace_id).open("a") as f:
        f.write(json.dumps(event) + "\n")

def load_recording(trace_id: str) -> list[dict]:
    path = recorder_path(trace_id)
    if not path.exists():
        raise FileNotFoundError(f"no recording for trace {trace_id}")
    return [json.loads(line) for line in path.open()]

Record during normal runs

Modify _dispatch_tool to record every input/output pair. We use the tool call ID (already on the span) as the lookup key. The recording is keyed by trace ID, so each request's tools are in their own file.

# Updated _dispatch_tool, replaces the version from Step 2
async def _dispatch_tool(block):
    with tracer.start_as_current_span("execute_tool") as span:
        span.set_attribute("gen_ai.tool.name", block.name)
        span.set_attribute("gen_ai.tool.call.id", block.id)

        trace_id = format(span.get_span_context().trace_id, "032x")

        # REPLAY MODE: return recorded result, skip the real call
        recording = replay_mode.get()
        if recording is not None:
            recorded = next(
                (e for e in recording if e["call_id"] == block.id),
                None,
            )
            if recorded:
                span.set_attribute("replay.matched", True)
                return recorded["result"]
            # Tool call wasn't in recording — model deviated from
            # original trajectory. Flag it but continue with a real call.
            span.set_attribute("replay.matched", False)

        # NORMAL MODE: call handler and record
        try:
            result = await HANDLERS[block.name](**block.input)
            record(trace_id, {
                "call_id": block.id,
                "tool": block.name,
                "args": block.input,
                "result": result,
            })
            return result
        except Exception as e:
            record(trace_id, {
                "call_id": block.id,
                "tool": block.name,
                "args": block.input,
                "error": str(e),
            })
            raise

The replay CLI

One command. Takes a trace ID, loads the recording, runs the agent in replay mode against the original user message.

# scripts/replay.py
import argparse, asyncio
from agent.loop import run_agent
from obs.replay import load_recording, replay_mode

parser = argparse.ArgumentParser()
parser.add_argument("trace_id")
parser.add_argument("--user-msg", required=True,
                    help="the original user message")
args = parser.parse_args()

recording = load_recording(args.trace_id)
print(f"loaded {len(recording)} tool events from trace {args.trace_id}")

async def main():
    token = replay_mode.set(recording)
    try:
        result = await run_agent(args.user_msg, user_id="replay")
        print("---")
        print(result)
    finally:
        replay_mode.reset(token)

asyncio.run(main())

What this looks like

$ python scripts/replay.py 8f3c2e1a9b4d... \
    --user-msg "show me my Q3 invoices"

loaded 3 tool events from trace 8f3c2e1a9b4d...

[span: invoke_agent  agent=research-assistant  user=replay]
  [span: chat  model=claude-sonnet-4-5  in=2341 out=89  finish=tool_use]
  [span: execute_tool  tool=search_docs  replay.matched=True]
  [span: execute_tool  tool=search_docs  replay.matched=True]
  [span: chat  model=claude-sonnet-4-5  in=3104 out=271  finish=tool_use]
  [span: execute_tool  tool=fetch_doc  replay.matched=True]
  [span: chat  model=claude-sonnet-4-5  in=4892 out=412  finish=end_turn]
---
"Your Q3 2025 invoices are: INV-4421 ($2,300, paid),
 INV-4438 ($1,150, outstanding), INV-4502 ($875, paid)..."

You've just reproduced a production run on your laptop. From here you can:

  • Change the system prompt and re-run to see if the new prompt would have produced a better answer (the recorded tool results still match because the model's tool calls don't change shape).
  • Add print statements or breakpoints inside _dispatch_tool to inspect every recorded interaction.
  • Replay with a different model (Sonnet → Opus) to A/B test on a real production case.
  • Save the trace to your eval set — it just became a regression test.

When replay diverges

If you change something that causes the model to make a tool call it didn't make in the original run, your recording won't have a match. The code above prints replay.matched=False on the span and falls through to calling the real handler. That's usually the right behavior — you want to see what would have happened — but it can have side effects (charging users, sending emails). For replay safety, gate state-changing tools behind a flag and stub them out when replay is active:

if replay_mode.get() is not None and POLICIES[block.name].scope != Scope.READ_ONLY:
    return {"error": "state-changing tool stubbed during replay"}

(Scope and POLICIES come from Chapter 2.3 Safety.) Read-only tools always replay safely; state-changing ones either replay from recording or return a stub error.

Why this earns its keep

The first time you ship an agent and a user files a bug like "I asked it X and it said Y," your old workflow is: try to reproduce locally, fail because the corpus shifted or the retrieval randomized differently, end up guessing. Your new workflow is: copy the trace ID from the audit log, run replay.py, watch the exact failure happen on your laptop, fix it, replay to confirm. The difference is hours per bug, sometimes days.

This is also what makes regression evals possible. Every reproducible production failure becomes a permanent eval case. Each one you fix stays fixed, because the next time anything else changes you replay all of them and catch any regressions immediately. Spans + recording + replay is the engineering substrate for chapter 1.4's evals — and for what Part III extends.

Question
Recording every tool result is going to be a lot of data. What about retention?

You don't have to keep all of it. Two pragmatic patterns:

  • Sample by default. Record 100% of failed runs (cheap, valuable) and 1% of successful runs (cheap, useful for general debugging). Both keyed off the finish reason / error state visible on the root span.
  • Tier by age. Keep full recordings hot for 7 days; archive to cold storage for 90; delete after that. For high-traffic agents this maps to a few hundred MB hot and a few GB cold.

The exception: runs flagged for inspection by a human (support ticket attached, eval failure linked, customer complaint) get full retention forever. They're future evals.

Question
Can I replay the model call too — not just the tool calls?

You can, and some setups do. Anthropic and OpenAI both offer prompt caching and ID-based response retrieval that lets you re-fetch the exact response to a given input. But the more useful pattern is to not replay model calls — that way you can change the prompt or the model and see how the new model would behave on the same tool results.

"Replay everything" is closer to a video recording; "replay only tool I/O" is closer to a debugger. The debugger version is more useful day-to-day.

Question
My agent uses parallel tool calls. Does replay still work?

Yes, because we match by call_id, not by call order. If the model fires three concurrent search_docs calls in turn 2, all three get recorded with their distinct IDs, and the replay dispatcher returns the right result for each. The only caveat is that timing-sensitive behavior won't reproduce exactly — but tool I/O is reproduced exactly, which is what matters for almost every bug.

End of chapter 2.1

Deliverable

An agent where every run produces a span tree following the OpenTelemetry GenAI conventions, structured logs that survive concurrent execution, and a replay <trace_id> CLI that reproduces production failures on your laptop in seconds. The substrate that makes the rest of Part II — Cost (2.2), Safety (2.3), Deployment (2.4) — actually debuggable, and the substrate that makes Part III's eval-driven development tractable at scale.

  • OpenTelemetry SDK wired up with a ConsoleSpanExporter for local dev
  • invoke_agent, chat, and execute_tool spans with gen_ai.* attributes
  • One-line vendor swap: OTLP exporter → Langfuse / Braintrust / Datadog
  • structlog with contextvars + OTel trace-ID enrichment
  • Conditional content capture (1% sample or flag-gated)
  • Tool I/O recording per trace, written to runs/replay/<trace_id>.jsonl
  • replay.py CLI: load recording, set replay_mode contextvar, run agent
  • State-changing tools stubbed during replay (integrates with 2.3 Safety scopes)