Skip to main content
Task queues decouple agent work producers from consumers. Push a task, a worker picks it up, and the result is stored — independently of the caller’s lifecycle. Use queues for long-running jobs, batch processing, and reliable retries.

Quick start

from afk.queues import TaskQueue, TaskItem
from afk.agents import Agent
from afk.core import Runner

agent = Agent(name="analyzer", model="gpt-5.2-mini", instructions="Analyze data.")

# Push a task
queue = TaskQueue()
await queue.push(TaskItem(
    contract="runner.chat.v1",
    agent=agent,
    user_message="Analyze Q4 revenue trends",
))

# A worker picks it up automatically

Task lifecycle

StateMeaning
queuedWaiting to be picked up by a worker
runningA worker is executing the task
completedTask finished successfully
failedTask hit an error (may be retried)
dead_letterAll retries exhausted — needs manual review

Execution contracts

Every task has a contract that defines what kind of work it represents:
Standard agent chat. Runs an agent with a user message.
task = TaskItem(
    contract="runner.chat.v1",
    agent=agent,
    user_message="Analyze this data",
    thread_id="t-42",              # Optional: for multi-turn
    timeout_s=120.0,               # Max execution time
)

Worker setup

1

Create a worker

from afk.queues import Worker

worker = Worker(
    queue=queue,
    runner_factory=lambda: Runner(),
    max_concurrency=5,            # Process up to 5 tasks in parallel
)
2

Register custom contract handlers

@worker.handler("myapp.batch.v1")
async def handle_batch(task: TaskItem) -> dict:
    # Custom processing logic
    results = await process_batch(task.payload["items"])
    return {"processed": len(results)}
3

Start the worker

await worker.start()  # Runs until stopped

Dead-letter handling

When a task exhausts all retries, it moves to the dead-letter queue (DLQ):
# Check for dead letters
dead_letters = await queue.get_dead_letters()

for task in dead_letters:
    print(f"Failed: {task.contract}{task.error}")

    # Retry manually after fixing the issue
    await queue.retry_dead_letter(task.task_id)

    # Or discard it
    await queue.discard_dead_letter(task.task_id)

Error classification

The queue uses error classification to decide whether to retry:
Error typeRetried?Example
Retryable (with backoff)Network timeout, rate limit, transient LLM error
Terminal (sent to DLQ)Invalid arguments, auth failure, missing model
Non-fatal (task completes with warning)Telemetry export failure

Queue backends

State lives in process memory. No setup required.
queue = TaskQueue()  # In-memory by default
Use for: Development, testing, prototyping.

Next steps