Chapter 10: The Event Stream — Watching Your Agent Think and Act Live
By the end of this chapter, you will understand every event type in the Managed Agents event stream, know how to process them in your code, and be able to build a basic streaming event handler.
The Big Idea
When Claude is running a session, it isn't operating in a black box. Every thought, every action, every tool call is broadcast as a structured event. You can watch your agent reason in real time — see it decide to search the web, observe it reading a file, catch it the moment it finishes work.
This visibility is what makes agents debuggable, trustworthy, and steerable. Without the event stream, you'd hand Claude a task and wait for a result, with no way to understand what happened in between. With the event stream, you see the work as it unfolds.
The event stream documentation describes the model this way: "Communication with Claude Managed Agents is event-based. You send user events to the agent, and receive agent and session events back to track status."
Events follow a {domain}.{action} naming convention. agent.message is an event from the agent domain with the message action. session.status_idle is a session-domain event indicating idle status. Once you see the pattern, the entire event vocabulary becomes predictable.
The Analogy
Think of the event stream like a flight's live tracking map.
Before live tracking, you booked a flight and just waited — you had no idea where the plane was or whether it was on time. When it landed, you found out.
With live tracking, you can see: departed on time, cruising altitude 35,000 feet, 40 minutes from destination, now in final descent. You know what's happening at every moment. If there's a delay, you see it before the gate agent announces it.
The event stream is live tracking for your agent. You can see: started the task, searched the web, found three relevant sources, is now reading the first one, is composing the summary. You know where the work is, what's going well, and — if something goes wrong — exactly where it went wrong.
How It Actually Works
The Two Categories of Events
Events are bidirectional. You send events to the session; the session sends events back to you.
Events you send (user events):
| Event type | When you send it |
|---|---|
user.message |
Start or continue a task |
user.tool_confirmation |
Approve or deny a pending tool call |
user.custom_tool_result |
Return the result of a custom tool your code executed |
user.interrupt |
Stop the agent mid-task |
user.define_outcome |
Define an outcome for the session (Research Preview) |
Events the session sends back:
| Event type | What it means |
|---|---|
agent.message |
Claude's text response |
agent.tool_use |
Claude is invoking a built-in or MCP tool |
agent.mcp_tool_use |
Claude is invoking an MCP tool specifically |
agent.custom_tool_use |
Claude is invoking a custom tool (your code must execute it) |
session.status_idle |
Session has paused; includes stop_reason |
session.error |
An error occurred (e.g., MCP auth failure) |
The session.status_idle Event
This is the most important event to handle correctly. It signals that the session has stopped running. The stop_reason field tells you why:
end_turn— The agent finished its work and is waiting for your next messagerequires_action— The session is paused waiting for tool confirmation or custom tool result. The blocking event IDs are instop_reason.requires_action.event_ids.
When you see end_turn, the work is done — read the output, evaluate it, decide whether to send another task or close the session.
When you see requires_action, your code needs to respond before the agent can continue.
The Core Streaming Pattern
The fundamental pattern for interacting with a session:
with client.beta.sessions.events.stream(session.id) as stream:
# Send the user message after the stream opens
client.beta.sessions.events.send(
session.id,
events=[
{
"type": "user.message",
"content": [
{
"type": "text",
"text": "Create a Python script that generates the first 20 Fibonacci numbers and saves them to fibonacci.txt",
},
],
},
],
)
# Process streaming events
for event in stream:
match event.type:
case "agent.message":
for block in event.content:
print(block.text, end="")
case "agent.tool_use":
print(f"\n[Using tool: {event.name}]")
case "session.status_idle":
print("\n\nAgent finished.")
break
Critical order: Open the stream first (with client.beta.sessions.events.stream(session.id) as stream:), then send. "The with block opens the SSE connection; anything you send inside the block is guaranteed to be observable. Sending before opening risks losing events that fire in the race window." (Cookbook)
Handling Custom Tool Calls
When a custom tool is invoked, the session pauses with requires_action. Your code must execute the tool and send back the result:
with client.beta.sessions.events.stream(session.id) as stream:
for event in stream:
if event.type == "session.status_idle" and (stop := event.stop_reason):
match stop.type:
case "requires_action":
for event_id in stop.event_ids:
# Look up the custom tool use event and execute it
tool_event = events_by_id[event_id]
result = call_tool(tool_event.name, tool_event.input)
# Send the result back
client.beta.sessions.events.send(
session.id,
events=[
{
"type": "user.custom_tool_result",
"custom_tool_use_id": event_id,
"content": [{"type": "text", "text": result}],
},
],
)
case "end_turn":
break
The pattern here: when you get requires_action, loop through the event IDs in stop.event_ids. For each, look up the corresponding event, execute the tool in your code, and send back a user.custom_tool_result.
Handling Tool Confirmation Requests
When a tool with always_ask policy fires, the same requires_action stop reason appears:
with client.beta.sessions.events.stream(session.id) as stream:
for event in stream:
if event.type == "session.status_idle" and (stop := event.stop_reason):
match stop.type:
case "requires_action":
for event_id in stop.event_ids:
# Approve the pending tool call
client.beta.sessions.events.send(
session.id,
events=[
{
"type": "user.tool_confirmation",
"tool_use_id": event_id,
"result": "allow",
},
],
)
case "end_turn":
break
The difference between custom tool handling and permission confirmation:
- Custom tool: you receive
agent.custom_tool_use, execute the tool yourself, senduser.custom_tool_result - Permission confirmation: you receive
agent.tool_use, decide allow/deny, senduser.tool_confirmation
The Timestamp: processed_at
Every event includes a processed_at timestamp indicating when the event was recorded server-side. If processed_at is null, the event has been queued by the harness and will be handled after preceding events finish processing. (Session event stream)
This matters for debugging: events with null processed_at are queued but not yet executed. Events with timestamps are part of the committed log.
Polling as an Alternative to Streaming
Streaming (SSE) is the primary pattern for watching events in real time. But for long-running background tasks where you don't want to hold a connection open, polling is a valid alternative:
"Polling wins in the opposite situation. It's stateless, survives process restarts, and composes cleanly with webhook handlers and queue workers that don't want to hold connections open." (Cookbook)
With polling, you periodically call events.list to fetch all events since the last check, rather than streaming them as they arrive. This works well for:
- Webhook-based architectures where you're processing events in a queue worker
- Batch jobs that run overnight and check status in the morning
- Mobile apps that can't hold long-lived connections
Memory Tool Events
When memory stores are attached to a session, the agent's interactions with memory (reading and writing to memory stores) appear as agent.tool_use events in the stream. (Memory) You'll see events like agent.tool_use with name: "memory_read" or name: "memory_write" flowing through the same stream as other tool calls.
Try It Yourself
Build a logging event handler. Modify the core streaming pattern to log all event types and timestamps:
with client.beta.sessions.events.stream(session.id) as stream: client.beta.sessions.events.send( session.id, events=[{"type": "user.message", "content": [{"type": "text", "text": "Search the web for recent AI news and summarize three headlines."}]}], ) for event in stream: print(f"[{event.type}] processed_at={getattr(event, 'processed_at', 'null')}") if event.type == "agent.message": for block in event.content: print(f" TEXT: {block.text[:100]}...") elif event.type == "agent.tool_use": print(f" TOOL: {event.name}") elif event.type == "session.status_idle": print(f" STOP: {event.stop_reason}") breakCount the events. How many
agent.tool_useevents fired? How many wereweb_searchvs.web_fetch? Did anagent.messageappear before the tools, after, or both?Add a tool name filter. Modify the handler so it only prints
agent.tool_useevents whereevent.name == "web_search". This simulates building a monitoring system that only cares about specific tool usage.Build a simple requires_action handler. Create a session with
bashonalways_ask, give it a task that needs bash, and implement the confirmation flow:for event in stream: if event.type == "session.status_idle": stop = event.stop_reason if stop and stop.type == "requires_action": for event_id in stop.event_ids: print(f"Approving event: {event_id}") client.beta.sessions.events.send( session.id, events=[{"type": "user.tool_confirmation", "tool_use_id": event_id, "result": "allow"}], ) elif stop and stop.type == "end_turn": breakTest polling vs. streaming. After a session finishes, fetch its event history without the stream:
for event in client.beta.sessions.events.list(session.id): print(f"{event.type}: {event.processed_at}")Compare the complete history to what you observed in real time.
Common Pitfalls
Sending before streaming. The most common mistake: calling
events.send()beforeevents.stream()is open. If the agent starts responding in the gap between send and stream, you miss those events. Always open the stream inside thewithblock before sending.Not handling
requires_actionin your event loop. Ifsession.status_idlefires withstop_reason: requires_actionand your event loop just breaks, the session is stuck waiting for your confirmation. You need to handle bothend_turnandrequires_actionstop reasons.Assuming all content blocks are text. The
agent.messageevent content array can contain multiple blocks of different types. Always checkblock.typebefore accessingblock.text.Ignoring
session.errorevents. If an MCP server fails authentication or a network error occurs, asession.errorevent fires. If your event loop only handlesagent.*andsession.status_idleevents, you'll miss errors and the session behavior will be confusing.Using streaming for long-running background jobs. Streaming requires holding a connection open. For tasks that run for hours, consider polling instead — it's stateless and survives process restarts.
Toolkit
Event Handler Boilerplate — Python — A complete, production-ready event handler with support for: text streaming, tool use logging, requires_action routing (both custom tools and permission confirmations), error handling, and session completion detection.
Event Type Quick Reference Card — All event types with their direction (sent vs. received), key fields, and when they fire. Single-page format for desk reference.
Chapter Recap
- Events are bidirectional structured messages following a
{domain}.{action}naming convention. You send user events; the session sends agent and session events back. - The
session.status_idleevent is the critical signal: checkstop_reasonto determine whether the task is done (end_turn) or waiting for your input (requires_action). - Always open the event stream before sending — this ensures you don't miss events that fire immediately after your message is delivered. For long-running background jobs, polling
events.listis a stateless alternative to streaming.