Skip to main content
Task queues decouple agent work producers from consumers. Push a task, a worker picks it up, and the result is stored — independently of the caller’s lifecycle. Use queues for long-running jobs, batch processing, and reliable retries.

Quick start

from afk.queues import InMemoryTaskQueue, RUNNER_CHAT_CONTRACT, TaskWorker
from afk.agents import Agent
from afk.core import Runner

agent = Agent(name="analyzer", model="gpt-4.1-mini", instructions="Analyze data.")

# Push a task
queue = InMemoryTaskQueue()
await queue.enqueue_contract(
    RUNNER_CHAT_CONTRACT,
    payload={"user_message": "Analyze Q4 revenue trends"},
    agent_name="analyzer",
)

# A worker consumes queued tasks
worker = TaskWorker(queue, agents={"analyzer": agent}, runner_factory=lambda: Runner())

Task lifecycle

StateMeaning
queuedWaiting to be picked up by a worker
runningA worker is executing the task
completedTask finished successfully
failedTask hit an error (may be retried)
dead_letterAll retries exhausted — needs manual review

Execution contracts

Every task has a contract that defines what kind of work it represents:
Standard agent chat. Runs an agent with a user message.
task = await queue.enqueue_contract(
    RUNNER_CHAT_CONTRACT,
    payload={
        "user_message": "Analyze this data",
        "context": {},
    },
    agent_name="analyzer",
)

Worker setup

1

Create a worker

from afk.queues import InMemoryTaskQueue, TaskWorker

queue = InMemoryTaskQueue()
worker = TaskWorker(
    queue=queue,
    agents={"analyzer": agent},
)
2

Register custom contract handlers

from afk.queues import ExecutionContract, ExecutionContractContext

class BatchContract(ExecutionContract):
    contract_id = "myapp.batch.v1"
    requires_agent = False

    async def execute(self, task_item, *, agent, worker_context: ExecutionContractContext) -> dict:
        results = await process_batch(task_item.payload["items"])
        return {"processed": len(results)}

worker = TaskWorker(
    queue=queue,
    agents={"analyzer": agent},
    execution_contracts={"myapp.batch.v1": BatchContract()},
)
3

Start the worker

await worker.start()  # Runs until stopped

Dead-letter handling

When a task exhausts all retries, it moves to the dead-letter queue (DLQ):
# Check for dead letters
dead_letters = await queue.list_dead_letters()

for task in dead_letters:
    print(f"Failed: {task.id}{task.error}")

# Retry manually after fixing the issue
moved = await queue.redrive_dead_letters()

# Or discard them
removed = await queue.purge_dead_letters()

Error classification

The queue uses error classification to decide whether to retry:
Error typeRetried?Example
Retryable (with backoff)Network timeout, rate limit, transient LLM error
Terminal (sent to DLQ)Invalid arguments, auth failure, missing model
Non-fatal (task completes with warning)Telemetry export failure

Queue backends

State lives in process memory. No setup required.
queue = InMemoryTaskQueue()
Use for: Development, testing, prototyping.

Connection pooling

For high-throughput production workloads, use RedisConnectionPool to manage connections efficiently:
from afk.llms.cache.redis_pool import get_redis_pool, PoolConfig

# Create a pooled connection
pool = await get_redis_pool(
    "redis://localhost:6379/0",
    config=PoolConfig(
        max_connections=50,
        max_idle_connections=10,
        socket_timeout=5.0,
    ),
)

# Use in queue
queue = TaskQueue(
    backend="redis",
    redis_url="redis://localhost:6379/0",
)

# Health check
if await pool.health_check():
    print("Redis connection healthy")
The pool provides:
  • Configurable max connections (default: 50)
  • Idle connection management
  • Automatic health checks
  • Singleton access via get_redis_pool()

Next steps

MCP Server

Expose tools via the Model Context Protocol.

Observability

Monitor queue performance and worker health.