Skip to main content
Once events are flowing in, the next question is: where do they go? There are two models:
  • Pull (ContextStore) — your chatbot calls enrich() at query time to fetch relevant context on demand.
  • Push (AsyncAgentContextWriter) — actions are streamed to the agent destination as they arrive, and compressed with an LLM before the context window overflows.
This page covers the push model.

What you own vs what the SDK handles

The SDK is built around plain callables — no base classes, no interfaces to implement. You supply functions; the SDK handles the orchestration.
You ownSDK owns
llm(prompt) → str — any LLM callableAccumulating actions per session
embed(text) → list[float] — any embedding functionDetecting when the threshold is crossed
upsert(id, vector, meta) — any vector store writeCalling your LLM at the right moment
write_actions(session_id, text) — deliver raw contextCalling your callbacks in the right order
overwrite_with_summary(session_id, summary) — replace with compressed contextPer-session task isolation (slow sessions never block others)
on_connect / on_disconnect — lifecycle hooksThe “summary first, redact after” ordering guarantee
Swap any of those callables — OpenAI for Anthropic, Pinecone for pgvector, Intercom for Slack — and everything else stays the same.

The context window problem

Every user session produces a stream of actions. Left unmanaged, that stream grows unboundedly — eventually overwhelming any agent’s context window. The push model solves this with a two-step pattern:
action batch 1  →  write_actions  →  destination (raw, latest context)
action batch 2  →  write_actions  →  destination (raw, latest context)
...K actions    →  AsyncSessionSummarizer  →  LLM
                →  overwrite_with_summary  →  destination (condensed summary replaces raw actions)
action batch K+1  →  write_actions  →  destination (fresh raw context, cycle restarts)
The destination always has context. Before the threshold is hit it has the latest raw actions; after, it has a compact LLM summary that replaced them. The window never grows unboundedly.

The overwrite requires an LLM step

overwrite_with_summary is not a simple in-memory swap. It is only called after AsyncSessionSummarizer has run your configured prompt through your LLM and produced a summary. The AsyncSessionSummarizer is required — without it there is no LLM call, no summary, and no overwrite.

Real-world example: Intercom

This is exactly how the event connector manages context in Intercom conversations.
  • write_actions_cb posts each batch as an internal admin note to the conversation and records the returned part_id.
  • When K actions accumulate, AsyncSessionSummarizer calls the LLM via circuit_breaker_llm.
  • overwrite_cb posts the summary note first, then redacts all old action notes in parallel.
The ordering is intentional — the summary must exist in the conversation before any notes are removed, so the support agent (and Intercom’s AI) always has complete context.
import asyncio
import httpx
import openai
from collections import defaultdict
from autoplay_sdk import AsyncConnectorClient, AsyncSessionSummarizer
from autoplay_sdk.agent_context import AsyncAgentContextWriter
from autoplay_sdk.models import ActionsPayload

async_openai = openai.AsyncOpenAI()

# ---------------------------------------------------------------------------
# Intercom HTTP client (wraps _post_note and _redact_part)
# ---------------------------------------------------------------------------

INTERCOM_API = "https://api.intercom.io"
ACCESS_TOKEN = "your-intercom-access-token"
ADMIN_ID = "your-admin-id"

http = httpx.AsyncClient(
    base_url=INTERCOM_API,
    headers={"Authorization": f"Bearer {ACCESS_TOKEN}", "Accept": "application/json"},
)

async def post_note(conversation_id: str, body: str) -> str | None:
    """Post an internal admin note and return the part_id on success."""
    r = await http.post(
        f"/conversations/{conversation_id}/reply",
        json={"type": "admin", "admin_id": ADMIN_ID, "message_type": "note", "body": body},
    )
    if r.is_success:
        return r.json().get("id")
    return None

async def redact_part(conversation_id: str, part_id: str) -> None:
    """Blank a previously posted note (best-effort — failures are swallowed)."""
    await http.post(
        "/conversations/redact",
        json={"type": "conversation_part", "conversation_id": conversation_id, "conversation_part_id": part_id},
    )

# ---------------------------------------------------------------------------
# Closure state (one instance per product / worker)
# ---------------------------------------------------------------------------

# session_id → Intercom conversation_id (populated when a conversation opens)
conv_map: dict[str, str] = {}
# session_id → list of part_ids for raw action notes (tracked for later redaction)
part_ids: dict[str, list[str]] = defaultdict(list)

# ---------------------------------------------------------------------------
# Callbacks
# ---------------------------------------------------------------------------

async def write_actions_cb(session_id: str, text: str) -> None:
    conv_id = conv_map.get(session_id)
    if not conv_id:
        return
    part_id = await post_note(conv_id, text)
    if part_id:
        part_ids[session_id].append(part_id)

async def overwrite_cb(session_id: str, summary: str) -> None:
    conv_id = conv_map.get(session_id)
    if not conv_id:
        return
    # Summary posted first — agent never has a blank context window.
    await post_note(conv_id, summary)
    old = part_ids.pop(session_id, [])
    if old:
        # Redact all old action notes in parallel.
        await asyncio.gather(*[redact_part(conv_id, pid) for pid in old])

# ---------------------------------------------------------------------------
# LLM — wrap with a circuit breaker in production to avoid flooding a
# degraded endpoint. Here shown as a plain async call for clarity.
# ---------------------------------------------------------------------------

async def llm(prompt: str) -> str:
    r = await async_openai.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3,
        max_tokens=256,
    )
    return r.choices[0].message.content

# ---------------------------------------------------------------------------
# Custom action formatter — groups actions by page for a cleaner LLM prompt.
# Remove format_actions to use the default numbered list instead.
# ---------------------------------------------------------------------------

def format_actions(payloads: list[ActionsPayload]) -> str:
    lines = []
    current_url = None
    for p in payloads:
        for a in p.actions:
            if a.canonical_url != current_url:
                current_url = a.canonical_url
                lines.append(f"\n[{current_url}]")
            lines.append(f"  - {a.title}")
    return "\n".join(lines).strip()

# ---------------------------------------------------------------------------
# Wire it together
# ---------------------------------------------------------------------------

summarizer = AsyncSessionSummarizer(
    llm=llm,
    threshold=10,          # summarise every 10 actions
    format_actions=format_actions,
)

writer = AsyncAgentContextWriter(
    summarizer=summarizer,
    write_actions=write_actions_cb,
    overwrite_with_summary=overwrite_cb,
)

async with AsyncConnectorClient(url=CONNECTOR_URL, token=API_TOKEN) as client:
    client.on_actions(writer.add)
    await client.run()

The ordering guarantee

Always post the summary before removing old context.overwrite_with_summary is awaited to completion before anything else happens. Implement it so the summary is confirmed at the destination first — only then should you delete or redact the previous raw actions.If deletion fails, the summary is still visible. The agent never loses context entirely.
This contract mirrors what session_summary_worker.py enforces internally: write_summary (post the new note) runs before pre_write_summary (redact the old notes). The comment in that file reads: “no context gap is ever possible.”

Generic chatbot example

For a chatbot where you control the context directly, the pattern is simpler — set_context just overwrites the previous value:
from autoplay_sdk import AsyncConnectorClient, AsyncSessionSummarizer
from autoplay_sdk.agent_context import AsyncAgentContextWriter

summarizer = AsyncSessionSummarizer(llm=my_llm, threshold=10)

async def write_actions(session_id: str, text: str) -> None:
    # Keep the chatbot's context window current between summarisations.
    await chatbot.set_context(session_id, text)

async def overwrite_with_summary(session_id: str, summary: str) -> None:
    # Replace accumulated raw actions with the compressed summary.
    await chatbot.set_context(session_id, summary)

writer = AsyncAgentContextWriter(
    summarizer=summarizer,
    overwrite_with_summary=overwrite_with_summary,
    write_actions=write_actions,  # optional — omit if you only want summaries pushed
)

async with AsyncConnectorClient(url=URL, token=TOKEN) as client:
    client.on_actions(writer.add)
    await client.run()

Constructor

AsyncAgentContextWriter(summarizer, overwrite_with_summary, write_actions=None)

summarizer
AsyncSessionSummarizer
required
The summarizer that accumulates actions and triggers LLM summarisation. Its LLM, threshold, and prompt are configured on this object. The writer hooks into summarizer.on_summary automatically — do not set on_summary separately.
overwrite_with_summary
async (session_id: str, summary: str) -> None
required
Called after each LLM summarisation. Must post the summary to the destination before removing any previous context. If this callback raises, the overwrite is logged and skipped — the raw action context remains in place.
write_actions
async (session_id: str, text: str) -> None | None
default:"None"
Called on every new actions batch with the batch formatted as plain text. Use this to keep the destination current between summarisations. Optional — omit if you only want summaries pushed.
debounce_ms
int
default:"0"
Per-session trailing-edge accumulation window in milliseconds. When > 0, multiple add() calls arriving within the window are merged into one ActionsPayload before write_actions is called — reducing destination API calls when events arrive in bursts.Set to 0 (default) if your write_actions callback already coalesces internally, such as a BaseChatbotWriter subclass that applies its own post_link_debounce_s window. Stacking both adds latency with no further reduction in API calls.

Debounce window

When debounce_ms > 0, the writer buffers ActionsPayload objects per session and merges them using ActionsPayload.merge() once the window expires with no new arrivals. The merged payload carries the full combined action list, so AsyncSessionSummarizer threshold counting is unaffected — it sees every action regardless of how many were coalesced.
# Reduces write_actions calls when PostHog sends event bursts
writer = AsyncAgentContextWriter(
    summarizer=summarizer,
    write_actions=my_destination_cb,
    overwrite_with_summary=overwrite_cb,
    debounce_ms=200,  # wait 200ms of silence before dispatching
)
Avoid double-debouncing. If write_actions already coalesces internally — for example a BaseChatbotWriter subclass with post_link_debounce_s — keep debounce_ms=0. Stacking both windows only adds latency.

API reference

MethodDescription
.add(payload)Receive an ActionsPayload — wire to client.on_actions

  • SessionSummarizer — configure the LLM, threshold, and prompt used for compression
  • ContextStore — the pull model; fetch enriched context at query time instead of pushing it