Skip to main content
AFK’s memory system persists conversation state across runs. Use it for multi-turn conversations, run resumption after interrupts, long-term knowledge retention, and vector-based semantic search.

Quick start: multi-turn conversation

import asyncio
from afk.agents import Agent
from afk.core import Runner

agent = Agent(name="tutor", model="gpt-5.2-mini", instructions="You are a Python tutor.")

async def main():
    runner = Runner()
    thread = "session-42"

    r1 = await runner.run(agent, user_message="What are generators?", thread_id=thread)
    print(r1.final_text)

    # Turn 2 — the agent remembers Turn 1
    r2 = await runner.run(agent, user_message="Show me an example", thread_id=thread)
    print(r2.final_text)

asyncio.run(main())
The thread_id links runs into a conversation. AFK automatically persists messages between runs.

What gets stored

Record typeWhat it containsWhen it’s written
EventUser messages, assistant responses, tool calls and resultsAfter each run step
CheckpointFull run state at a point in timeAt step boundaries (pre-LLM, post-tool)
State (KV)Checkpoint pointers, effect journal, background tool stateDuring and after runs
Long-term memoryPersistent knowledge with optional embeddingsVia upsert_long_term_memory
What’s NOT stored automatically: Raw LLM provider responses or internal framework temporaries. Only conversation-visible records and explicit state writes are persisted.

State lifecycle

Resume interrupted runs

If a run is interrupted (crash, timeout, pause for approval), resume from the last checkpoint:
# Start a run that might be long
result = await runner.run(agent, user_message="Analyze this dataset...")

# If interrupted, resume later
if result.state == "interrupted":
    resumed = await runner.resume(
        agent,
        run_id=result.run_id,
        thread_id=result.thread_id,
    )
    print(resumed.final_text)
Checkpoints are written at key boundaries: before each LLM call, after each tool batch, and after each step completes. On resume, completed tool calls are replayed from the effect journal — no duplicate side effects.

Compact long threads

Over time, conversation threads grow and consume tokens. Use compaction to trim old events:
from afk.memory import RetentionPolicy

result = await runner.compact_thread(
    thread_id="session-42",
    event_policy=RetentionPolicy(
        max_events_per_thread=500,
        keep_event_types=["trace"],
        scan_limit=20_000,
    ),
)
print(f"Removed {result.events_removed} events")
Compaction applies retention rules: protected event types are preserved first, then the most recent remaining events fill the budget.

Memory backends

AFK ships with four backends. All implement the MemoryStore protocol.
State lives in process memory. Fast, no setup, but lost on restart.
from afk.memory import InMemoryMemoryStore

runner = Runner(memory_store=InMemoryMemoryStore())
# Or just: Runner()  — in-memory is the default
Use for: Development, testing, short-lived scripts.

Environment-based selection

Set environment variables to auto-select a backend without code changes:
export AFK_MEMORY_BACKEND=sqlite
export AFK_MEMORY_SQLITE_PATH=./agent_memory.sqlite3
The runner falls back to in-memory if the configured backend fails to initialize.

Custom backends

Implement the MemoryStore abstract class to add support for any database:
from afk.memory.store import MemoryStore, MemoryCapabilities
from afk.memory.types import MemoryEvent, LongTermMemory, JsonValue

class MyMemoryStore(MemoryStore):
    capabilities = MemoryCapabilities(
        text_search=True,
        vector_search=False,
        atomic_upsert=True,
        ttl=False,
    )

    async def setup(self) -> None:
        # Initialize connections, create tables
        await super().setup()

    async def close(self) -> None:
        # Clean up connections
        await super().close()

    async def append_event(self, event: MemoryEvent) -> None: ...
    async def get_recent_events(self, thread_id: str, limit: int = 50) -> list[MemoryEvent]: ...
    async def get_events_since(self, thread_id: str, since_ms: int, limit: int = 500) -> list[MemoryEvent]: ...
    async def put_state(self, thread_id: str, key: str, value: JsonValue) -> None: ...
    async def get_state(self, thread_id: str, key: str) -> JsonValue | None: ...
    async def list_state(self, thread_id: str, prefix: str | None = None) -> dict[str, JsonValue]: ...
    async def delete_state(self, thread_id: str, key: str) -> None: ...
    async def replace_thread_events(self, thread_id: str, events: list[MemoryEvent]) -> None: ...
    async def upsert_long_term_memory(self, memory: LongTermMemory, *, embedding=None) -> None: ...
    async def delete_long_term_memory(self, user_id: str | None, memory_id: str) -> None: ...
    async def list_long_term_memories(self, user_id: str | None, *, scope=None, limit=100) -> list[LongTermMemory]: ...
Declare capabilities to tell the framework which features your backend supports. Features like vector search are only used when the backend declares support.

Long-term memory

Beyond conversation events, AFK supports persistent long-term memories scoped per user and purpose:
from afk.memory.types import LongTermMemory
from afk.memory import now_ms, new_id

memory_store = await runner._ensure_memory_store()

# Store a long-term memory
await memory_store.upsert_long_term_memory(
    LongTermMemory(
        id=new_id("ltm"),
        user_id="user-123",
        scope="preferences",
        data={"theme": "dark", "language": "python"},
        text="User prefers dark theme and Python examples",
        tags=["preference", "ui"],
        metadata={},
        created_at=now_ms(),
        updated_at=now_ms(),
    )
)

# Retrieve memories
memories = await memory_store.list_long_term_memories(
    user_id="user-123",
    scope="preferences",
    limit=10,
)
Backends that support vector search (SQLite, Postgres) can find semantically similar memories:
# Search by embedding similarity
results = await memory_store.search_long_term_memory_vector(
    user_id="user-123",
    query_embedding=embedding_vector,  # list[float] from your embedding model
    scope="knowledge",
    limit=5,
    min_score=0.7,
)

for memory, score in results:
    print(f"{score:.2f}: {memory.text}")
All backends support basic text search across memory content:
results = await memory_store.search_long_term_memory_text(
    user_id="user-123",
    query="python generators",
    scope="knowledge",
    limit=10,
)

Design guidelines

  • Always use thread_id for conversations. Without it, each run starts fresh.
  • Compact threads proactively. Don’t wait until you hit token limits. A good rule: compact when the thread exceeds ~500 events.
  • Use checkpoints for long-running agents. If a run might take minutes, checkpoints let you resume on failure.
  • Don’t store secrets in memory. Thread events are persisted and may be readable.
  • Choose the right backend. In-memory for dev, SQLite for local persistence, Postgres/Redis for production.
  • Use scopes for long-term memory. Organize memories by purpose (preferences, knowledge, history) to keep queries efficient.

Next steps