Once events are flowing in, the next question is: where do they go?
There are two models:
- Pull (
ContextStore) — your chatbot calls enrich() at query time to fetch relevant context on demand.
- Push (
AsyncAgentContextWriter) — actions are streamed to the agent destination as they arrive, and compressed with an LLM before the context window overflows.
This page covers the push model.
What you own vs what the SDK handles
The SDK is built around plain callables — no base classes, no interfaces to implement. You supply functions; the SDK handles the orchestration.
| You own | SDK owns |
|---|
llm(prompt) → str — any LLM callable | Accumulating actions per session |
embed(text) → list[float] — any embedding function | Detecting when the threshold is crossed |
upsert(id, vector, meta) — any vector store write | Calling your LLM at the right moment |
write_actions(session_id, text) — deliver raw context | Calling your callbacks in the right order |
overwrite_with_summary(session_id, summary) — replace with compressed context | Per-session task isolation (slow sessions never block others) |
on_connect / on_disconnect — lifecycle hooks | The “summary first, redact after” ordering guarantee |
Swap any of those callables — OpenAI for Anthropic, Pinecone for pgvector, Intercom for Slack — and everything else stays the same.
The context window problem
Every user session produces a stream of actions. Left unmanaged, that stream grows unboundedly — eventually overwhelming any agent’s context window. The push model solves this with a two-step pattern:
action batch 1 → write_actions → destination (raw, latest context)
action batch 2 → write_actions → destination (raw, latest context)
...K actions → AsyncSessionSummarizer → LLM
→ overwrite_with_summary → destination (condensed summary replaces raw actions)
action batch K+1 → write_actions → destination (fresh raw context, cycle restarts)
The destination always has context. Before the threshold is hit it has the latest raw actions; after, it has a compact LLM summary that replaced them. The window never grows unboundedly.
The overwrite requires an LLM step
overwrite_with_summary is not a simple in-memory swap. It is only called after AsyncSessionSummarizer has run your configured prompt through your LLM and produced a summary. The AsyncSessionSummarizer is required — without it there is no LLM call, no summary, and no overwrite.
Real-world example: Intercom
This is exactly how the event connector manages context in Intercom conversations.
write_actions_cb posts each batch as an internal admin note to the conversation and records the returned part_id.
- When K actions accumulate,
AsyncSessionSummarizer calls the LLM via circuit_breaker_llm.
overwrite_cb posts the summary note first, then redacts all old action notes in parallel.
The ordering is intentional — the summary must exist in the conversation before any notes are removed, so the support agent (and Intercom’s AI) always has complete context.
import asyncio
import httpx
import openai
from collections import defaultdict
from autoplay_sdk import AsyncConnectorClient, AsyncSessionSummarizer
from autoplay_sdk.agent_context import AsyncAgentContextWriter
from autoplay_sdk.models import ActionsPayload
async_openai = openai.AsyncOpenAI()
# ---------------------------------------------------------------------------
# Intercom HTTP client (wraps _post_note and _redact_part)
# ---------------------------------------------------------------------------
INTERCOM_API = "https://api.intercom.io"
ACCESS_TOKEN = "your-intercom-access-token"
ADMIN_ID = "your-admin-id"
http = httpx.AsyncClient(
base_url=INTERCOM_API,
headers={"Authorization": f"Bearer {ACCESS_TOKEN}", "Accept": "application/json"},
)
async def post_note(conversation_id: str, body: str) -> str | None:
"""Post an internal admin note and return the part_id on success."""
r = await http.post(
f"/conversations/{conversation_id}/reply",
json={"type": "admin", "admin_id": ADMIN_ID, "message_type": "note", "body": body},
)
if r.is_success:
return r.json().get("id")
return None
async def redact_part(conversation_id: str, part_id: str) -> None:
"""Blank a previously posted note (best-effort — failures are swallowed)."""
await http.post(
"/conversations/redact",
json={"type": "conversation_part", "conversation_id": conversation_id, "conversation_part_id": part_id},
)
# ---------------------------------------------------------------------------
# Closure state (one instance per product / worker)
# ---------------------------------------------------------------------------
# session_id → Intercom conversation_id (populated when a conversation opens)
conv_map: dict[str, str] = {}
# session_id → list of part_ids for raw action notes (tracked for later redaction)
part_ids: dict[str, list[str]] = defaultdict(list)
# ---------------------------------------------------------------------------
# Callbacks
# ---------------------------------------------------------------------------
async def write_actions_cb(session_id: str, text: str) -> None:
conv_id = conv_map.get(session_id)
if not conv_id:
return
part_id = await post_note(conv_id, text)
if part_id:
part_ids[session_id].append(part_id)
async def overwrite_cb(session_id: str, summary: str) -> None:
conv_id = conv_map.get(session_id)
if not conv_id:
return
# Summary posted first — agent never has a blank context window.
await post_note(conv_id, summary)
old = part_ids.pop(session_id, [])
if old:
# Redact all old action notes in parallel.
await asyncio.gather(*[redact_part(conv_id, pid) for pid in old])
# ---------------------------------------------------------------------------
# LLM — wrap with a circuit breaker in production to avoid flooding a
# degraded endpoint. Here shown as a plain async call for clarity.
# ---------------------------------------------------------------------------
async def llm(prompt: str) -> str:
r = await async_openai.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=256,
)
return r.choices[0].message.content
# ---------------------------------------------------------------------------
# Custom action formatter — groups actions by page for a cleaner LLM prompt.
# Remove format_actions to use the default numbered list instead.
# ---------------------------------------------------------------------------
def format_actions(payloads: list[ActionsPayload]) -> str:
lines = []
current_url = None
for p in payloads:
for a in p.actions:
if a.canonical_url != current_url:
current_url = a.canonical_url
lines.append(f"\n[{current_url}]")
lines.append(f" - {a.title}")
return "\n".join(lines).strip()
# ---------------------------------------------------------------------------
# Wire it together
# ---------------------------------------------------------------------------
summarizer = AsyncSessionSummarizer(
llm=llm,
threshold=10, # summarise every 10 actions
format_actions=format_actions,
)
writer = AsyncAgentContextWriter(
summarizer=summarizer,
write_actions=write_actions_cb,
overwrite_with_summary=overwrite_cb,
)
async with AsyncConnectorClient(url=CONNECTOR_URL, token=API_TOKEN) as client:
client.on_actions(writer.add)
await client.run()
The ordering guarantee
Always post the summary before removing old context.overwrite_with_summary is awaited to completion before anything else happens. Implement it so the summary is confirmed at the destination first — only then should you delete or redact the previous raw actions.If deletion fails, the summary is still visible. The agent never loses context entirely.
This contract mirrors what session_summary_worker.py enforces internally: write_summary (post the new note) runs before pre_write_summary (redact the old notes). The comment in that file reads: “no context gap is ever possible.”
Generic chatbot example
For a chatbot where you control the context directly, the pattern is simpler — set_context just overwrites the previous value:
from autoplay_sdk import AsyncConnectorClient, AsyncSessionSummarizer
from autoplay_sdk.agent_context import AsyncAgentContextWriter
summarizer = AsyncSessionSummarizer(llm=my_llm, threshold=10)
async def write_actions(session_id: str, text: str) -> None:
# Keep the chatbot's context window current between summarisations.
await chatbot.set_context(session_id, text)
async def overwrite_with_summary(session_id: str, summary: str) -> None:
# Replace accumulated raw actions with the compressed summary.
await chatbot.set_context(session_id, summary)
writer = AsyncAgentContextWriter(
summarizer=summarizer,
overwrite_with_summary=overwrite_with_summary,
write_actions=write_actions, # optional — omit if you only want summaries pushed
)
async with AsyncConnectorClient(url=URL, token=TOKEN) as client:
client.on_actions(writer.add)
await client.run()
Constructor
AsyncAgentContextWriter(summarizer, overwrite_with_summary, write_actions=None)
summarizer
AsyncSessionSummarizer
required
The summarizer that accumulates actions and triggers LLM summarisation. Its LLM, threshold, and prompt are configured on this object. The writer hooks into summarizer.on_summary automatically — do not set on_summary separately.
overwrite_with_summary
async (session_id: str, summary: str) -> None
required
Called after each LLM summarisation. Must post the summary to the destination before removing any previous context. If this callback raises, the overwrite is logged and skipped — the raw action context remains in place.
write_actions
async (session_id: str, text: str) -> None | None
default:"None"
Called on every new actions batch with the batch formatted as plain text. Use this to keep the destination current between summarisations. Optional — omit if you only want summaries pushed.
Per-session trailing-edge accumulation window in milliseconds. When > 0,
multiple add() calls arriving within the window are merged into one
ActionsPayload before write_actions is called — reducing destination API
calls when events arrive in bursts.Set to 0 (default) if your write_actions callback already coalesces
internally, such as a BaseChatbotWriter subclass that applies its own
post_link_debounce_s window. Stacking both adds latency with no further
reduction in API calls.
Debounce window
When debounce_ms > 0, the writer buffers ActionsPayload objects per session and merges them using ActionsPayload.merge() once the window expires with no new arrivals.
The merged payload carries the full combined action list, so AsyncSessionSummarizer threshold counting is unaffected — it sees every action regardless of how many were coalesced.
# Reduces write_actions calls when PostHog sends event bursts
writer = AsyncAgentContextWriter(
summarizer=summarizer,
write_actions=my_destination_cb,
overwrite_with_summary=overwrite_cb,
debounce_ms=200, # wait 200ms of silence before dispatching
)
Avoid double-debouncing. If write_actions already coalesces internally — for example a BaseChatbotWriter subclass with post_link_debounce_s — keep debounce_ms=0. Stacking both windows only adds latency.
API reference
| Method | Description |
|---|
.add(payload) | Receive an ActionsPayload — wire to client.on_actions |
- SessionSummarizer — configure the LLM, threshold, and prompt used for compression
- ContextStore — the pull model; fetch enriched context at query time instead of pushing it