AFK’s memory system persists conversation state across runs. Use it for multi-turn conversations, run resumption after interrupts, long-term knowledge retention, and vector-based semantic search.
Quick start: multi-turn conversation
import asyncio
from afk.agents import Agent
from afk.core import Runner
agent = Agent(name="tutor", model="gpt-5.2-mini", instructions="You are a Python tutor.")
async def main():
runner = Runner()
thread = "session-42"
r1 = await runner.run(agent, user_message="What are generators?", thread_id=thread)
print(r1.final_text)
# Turn 2 — the agent remembers Turn 1
r2 = await runner.run(agent, user_message="Show me an example", thread_id=thread)
print(r2.final_text)
asyncio.run(main())
The thread_id links runs into a conversation. AFK automatically persists messages between runs.
What gets stored
| Record type | What it contains | When it’s written |
|---|
| Event | User messages, assistant responses, tool calls and results | After each run step |
| Checkpoint | Full run state at a point in time | At step boundaries (pre-LLM, post-tool) |
| State (KV) | Checkpoint pointers, effect journal, background tool state | During and after runs |
| Long-term memory | Persistent knowledge with optional embeddings | Via upsert_long_term_memory |
What’s NOT stored automatically: Raw LLM provider responses or internal
framework temporaries. Only conversation-visible records and explicit state
writes are persisted.
State lifecycle
Resume interrupted runs
If a run is interrupted (crash, timeout, pause for approval), resume from the last checkpoint:
# Start a run that might be long
result = await runner.run(agent, user_message="Analyze this dataset...")
# If interrupted, resume later
if result.state == "interrupted":
resumed = await runner.resume(
agent,
run_id=result.run_id,
thread_id=result.thread_id,
)
print(resumed.final_text)
Checkpoints are written at key boundaries: before each LLM call, after
each tool batch, and after each step completes. On resume, completed tool
calls are replayed from the effect journal — no duplicate side effects.
Compact long threads
Over time, conversation threads grow and consume tokens. Use compaction to trim old events:
from afk.memory import RetentionPolicy
result = await runner.compact_thread(
thread_id="session-42",
event_policy=RetentionPolicy(
max_events_per_thread=500,
keep_event_types=["trace"],
scan_limit=20_000,
),
)
print(f"Removed {result.events_removed} events")
Compaction applies retention rules: protected event types are preserved first, then the most recent remaining events fill the budget.
Memory backends
AFK ships with four backends. All implement the MemoryStore protocol.
In-memory (default)
SQLite
PostgreSQL
Redis
State lives in process memory. Fast, no setup, but lost on restart.from afk.memory import InMemoryMemoryStore
runner = Runner(memory_store=InMemoryMemoryStore())
# Or just: Runner() — in-memory is the default
Use for: Development, testing, short-lived scripts. Persistent local storage with JSON serialization and local vector search.from afk.memory.adapters.sqlite import SQLiteMemoryStore
runner = Runner(memory_store=SQLiteMemoryStore(path="agent_memory.sqlite3"))
Features: WAL mode, text search, vector similarity search (cosine), atomic upsert.Use for: Local development with persistence, single-process deployments. Production-grade backend with pgvector support for vector search.from afk.memory.adapters.postgres import PostgresMemoryStore
runner = Runner(
memory_store=PostgresMemoryStore(dsn="postgresql://user:pass@host/db")
)
Use for: Production multi-process deployments. In-memory store backed by Redis for shared state across processes.from afk.memory.adapters.redis import RedisMemoryStore
runner = Runner(
memory_store=RedisMemoryStore(url="redis://localhost:6379")
)
Use for: Shared state across workers, ephemeral but durable-enough sessions.
Environment-based selection
Set environment variables to auto-select a backend without code changes:
export AFK_MEMORY_BACKEND=sqlite
export AFK_MEMORY_SQLITE_PATH=./agent_memory.sqlite3
The runner falls back to in-memory if the configured backend fails to initialize.
Custom backends
Implement the MemoryStore abstract class to add support for any database:
from afk.memory.store import MemoryStore, MemoryCapabilities
from afk.memory.types import MemoryEvent, LongTermMemory, JsonValue
class MyMemoryStore(MemoryStore):
capabilities = MemoryCapabilities(
text_search=True,
vector_search=False,
atomic_upsert=True,
ttl=False,
)
async def setup(self) -> None:
# Initialize connections, create tables
await super().setup()
async def close(self) -> None:
# Clean up connections
await super().close()
async def append_event(self, event: MemoryEvent) -> None: ...
async def get_recent_events(self, thread_id: str, limit: int = 50) -> list[MemoryEvent]: ...
async def get_events_since(self, thread_id: str, since_ms: int, limit: int = 500) -> list[MemoryEvent]: ...
async def put_state(self, thread_id: str, key: str, value: JsonValue) -> None: ...
async def get_state(self, thread_id: str, key: str) -> JsonValue | None: ...
async def list_state(self, thread_id: str, prefix: str | None = None) -> dict[str, JsonValue]: ...
async def delete_state(self, thread_id: str, key: str) -> None: ...
async def replace_thread_events(self, thread_id: str, events: list[MemoryEvent]) -> None: ...
async def upsert_long_term_memory(self, memory: LongTermMemory, *, embedding=None) -> None: ...
async def delete_long_term_memory(self, user_id: str | None, memory_id: str) -> None: ...
async def list_long_term_memories(self, user_id: str | None, *, scope=None, limit=100) -> list[LongTermMemory]: ...
Declare capabilities to tell the framework which features your backend
supports. Features like vector search are only used when the backend
declares support.
Long-term memory
Beyond conversation events, AFK supports persistent long-term memories scoped per user and purpose:
from afk.memory.types import LongTermMemory
from afk.memory import now_ms, new_id
memory_store = await runner._ensure_memory_store()
# Store a long-term memory
await memory_store.upsert_long_term_memory(
LongTermMemory(
id=new_id("ltm"),
user_id="user-123",
scope="preferences",
data={"theme": "dark", "language": "python"},
text="User prefers dark theme and Python examples",
tags=["preference", "ui"],
metadata={},
created_at=now_ms(),
updated_at=now_ms(),
)
)
# Retrieve memories
memories = await memory_store.list_long_term_memories(
user_id="user-123",
scope="preferences",
limit=10,
)
Vector search
Backends that support vector search (SQLite, Postgres) can find semantically similar memories:
# Search by embedding similarity
results = await memory_store.search_long_term_memory_vector(
user_id="user-123",
query_embedding=embedding_vector, # list[float] from your embedding model
scope="knowledge",
limit=5,
min_score=0.7,
)
for memory, score in results:
print(f"{score:.2f}: {memory.text}")
Text search
All backends support basic text search across memory content:
results = await memory_store.search_long_term_memory_text(
user_id="user-123",
query="python generators",
scope="knowledge",
limit=10,
)
Design guidelines
- Always use
thread_id for conversations. Without it, each run starts fresh.
- Compact threads proactively. Don’t wait until you hit token limits. A good rule: compact when the thread exceeds ~500 events.
- Use checkpoints for long-running agents. If a run might take minutes, checkpoints let you resume on failure.
- Don’t store secrets in memory. Thread events are persisted and may be readable.
- Choose the right backend. In-memory for dev, SQLite for local persistence, Postgres/Redis for production.
- Use scopes for long-term memory. Organize memories by purpose (
preferences, knowledge, history) to keep queries efficient.
Next steps