Skip to main content
AFK supports real-time streaming via Runner.run_stream(). Instead of waiting for the full response, you receive events as they happen: incremental text, tool lifecycle updates, and terminal status.

Quick example

import asyncio
from afk.agents import Agent
from afk.core import Runner

agent = Agent(name="explainer", model="gpt-5.2-mini", instructions="Explain clearly.")

async def main():
    runner = Runner()
    handle = await runner.run_stream(agent, user_message="Explain DNS resolution")

    async for event in handle:
        match event.type:
            case "text_delta":
                print(event.text_delta, end="", flush=True)
            case "tool_completed":
                if not event.tool_success:
                    print(f"\n[TOOL ERROR] {event.tool_name}: {event.tool_error}")
            case "completed":
                print(f"\n[DONE] {event.result.state}")

asyncio.run(main())

Event reference

run_stream() emits AgentStreamEvent values with these types:
Event typeWhen it firesKey fields
text_deltaIncremental text chunks from streaming or fallback pathevent.text_delta, event.step
step_startedNew step in the agent loopevent.step, event.state
tool_startedA tool call is about to executeevent.tool_name, event.tool_call_id, event.step
tool_completedA tool call finishedevent.tool_name, event.tool_call_id, event.tool_success, event.tool_output, event.tool_error
tool_deferredA tool call was accepted but deferredevent.tool_name, event.tool_call_id, event.tool_ticket_id, event.data.resume_hint
tool_background_resolvedA deferred tool finished successfullyevent.tool_name, event.tool_ticket_id, event.tool_output
tool_background_failedA deferred tool failed/expiredevent.tool_name, event.tool_ticket_id, event.tool_error
completedRun finishedevent.result (full AgentResult)
errorStream/runtime bridge errorevent.error

Streaming modes

  • If the provider supports streaming, AFK forwards model deltas as text_delta.
  • If the provider is non-streaming, AFK emits chunked fallback text_delta from final text so UI behavior stays consistent.

Stream control

AgentStreamHandle is read-only. For lifecycle controls (pause(), resume(), cancel(), interrupt()), use run_handle().
stream = await runner.run_stream(agent, user_message="Long analysis...")
async for event in stream:
    ...
result = stream.result
# Lifecycle control variant
handle = await runner.run_handle(agent, user_message="Long analysis...")
await handle.pause()
await handle.resume()
await handle.interrupt()
result = await handle.await_result()

Background tools example

stream = await runner.run_stream(agent, user_message="Implement feature, run build, and write docs")
async for event in stream:
    if event.type == "tool_deferred" and event.tool_name == "build_project":
        print(f"[build queued] ticket={event.tool_ticket_id}")
    elif event.type == "tool_background_resolved" and event.tool_name == "build_project":
        print(f"[build done] output={event.tool_output}")
    elif event.type == "text_delta":
        print(event.text_delta, end="")

Error handling

error events indicate stream/runtime bridge failures. Tool failures are reported through tool_completed (tool_success=False) and do not necessarily fail the run.

Next steps