Skip to main content

Documentation Index

Fetch the complete documentation index at: https://afk.arpan.sh/llms.txt

Use this file to discover all available pages before exploring further.

What this snippet demonstrates

This snippet shows how to configure:
  1. Timeout middleware to bound slow provider calls
  2. Redis connection pooling for shared cache or memory connections
  3. Shutdown handling so runners and Redis pools close cleanly

Timeout middleware

Apply per-request timeouts to prevent runaway LLM calls:
import asyncio
from afk.llms import LLMBuilder, LLMRequest
from afk.llms.middleware import MiddlewareStack
from afk.llms.middleware.timeout import (
    TimeoutMiddleware,
    EmbedTimeoutMiddleware,
    StreamTimeoutMiddleware,
    TimeoutConfig,
)

config = TimeoutConfig(
    default_timeout_s=30.0,
    chat_timeout_s=60.0,
    embed_timeout_s=15.0,
    stream_timeout_s=45.0,
)

stack = MiddlewareStack(
    chat=[TimeoutMiddleware(config)],
    embed=[EmbedTimeoutMiddleware(config)],
    stream=[StreamTimeoutMiddleware(config)],
)

production_client = (
    LLMBuilder()
    .provider("openai")
    .model("gpt-4.1-mini")
    .profile("production")
    .with_middlewares(stack)
    .build()
)

Per-request timeout override

from afk.llms import TimeoutPolicy

req = LLMRequest(
    model="gpt-4.1-mini",
    messages=[...],
    timeout_policy=TimeoutPolicy(request_timeout_s=120.0),  # Override default
)

response = await production_client.chat(req)

Redis connection pooling

For Redis deployments, use connection pooling instead of creating a new client per request:
from afk.llms.cache.redis_pool import (
    get_redis_pool,
    PoolConfig,
    close_all_pools,
)

async def setup_redis_pool():
    pool = await get_redis_pool(
        "redis://localhost:6379/0",
        config=PoolConfig(
            max_connections=50,
            max_idle_connections=10,
            socket_timeout=5.0,
            socket_connect_timeout=5.0,
            socket_keepalive=True,
            health_check_interval_s=30.0,
        ),
    )
    
    if await pool.health_check():
        print("Redis connection pool healthy")
    
    return pool

Using with memory store

import asyncio
from afk.memory.adapters.redis import RedisMemoryStore
from afk.core import Runner

async def main():
    pool = await get_redis_pool(
        "redis://localhost:6379/0",
        config=PoolConfig(max_connections=50),
    )
    
    runner = Runner(
        memory_store=RedisMemoryStore(url="redis://localhost:6379/0"),
    )
    
    result = await runner.run(agent, user_message="Hello")
    print(result.final_text)
    
    await runner.close()
    await close_all_pools()

asyncio.run(main())

Full example

import asyncio
from afk.llms import LLMBuilder
from afk.llms.middleware import MiddlewareStack
from afk.llms.middleware.timeout import (
    TimeoutMiddleware,
    TimeoutConfig,
)
from afk.llms.cache.redis_pool import (
    get_redis_pool,
    PoolConfig,
    close_all_pools,
)
from afk.memory.adapters.redis import RedisMemoryStore
from afk.core import Runner
from afk.agents import Agent

class ProductionSetup:
    def __init__(self):
        self.llm_client = None
        self.runner = None
        self.pool = None
    
    async def __aenter__(self):
        pool_config = PoolConfig(
            max_connections=50,
            max_idle_connections=10,
            socket_timeout=5.0,
            socket_connect_timeout=5.0,
        )
        self.pool = await get_redis_pool(
            "redis://localhost:6379/0",
            config=pool_config,
        )
        
        timeout_config = TimeoutConfig(
            default_timeout_s=30.0,
            chat_timeout_s=60.0,
        )
        stack = MiddlewareStack(
            chat=[TimeoutMiddleware(timeout_config)],
        )
        
        self.llm_client = (
            LLMBuilder()
            .provider("openai")
            .model("gpt-4.1-mini")
            .profile("production")
            .with_middlewares(stack)
            .build()
        )
        
        self.runner = Runner(
            memory_store=RedisMemoryStore(url="redis://localhost:6379/0"),
        )
        
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.runner:
            await self.runner.close()
        await close_all_pools()
        return False


async def main():
    agent = Agent(
        name="assistant",
        model="gpt-4.1-mini",
        instructions="You are a helpful assistant.",
    )
    
    async with ProductionSetup() as setup:
        result = await setup.runner.run(
            agent,
            user_message="Hello, world!",
        )
        print(result.final_text)

asyncio.run(main())

Configuration reference

TimeoutConfig

ParameterDefaultDescription
default_timeout_s30.0Default timeout for all operations
chat_timeout_sNoneSpecific timeout for chat requests
embed_timeout_sNoneSpecific timeout for embeddings
stream_timeout_sNoneSpecific timeout for streaming

PoolConfig

ParameterDefaultDescription
max_connections50Maximum total connections
max_idle_connections10Maximum idle connections
socket_timeout5.0Socket read/write timeout
socket_connect_timeout5.0Connection establishment timeout
socket_keepaliveFalseEnable TCP keepalive
health_check_interval_s30.0Interval for health checks