Skip to main content

Final result

When a user asks your Dify chatbot a help question, the pipeline silently pulls that user’s recent activity from the knowledge base and uses it to give a response that’s specific to where they are — not a generic answer that could apply to anyone.
User: How do I create a project?

Bot:  To create a project, go to
      My Projects in the sidebar
      and click Add Project.

      (Could be on any page.
       No idea where they are.)

Prerequisites

Complete the Quickstart. You should have:
  • PostHog in the browser — snippet, API key, product_id on identify
  • Connector credentials — stream URL and API token from your Autoplay dashboard
  • autoplay-sdk installed — and a successful test stream from the Quickstart
  • A Dify account — cloud at cloud.dify.ai or self-hosted, with an existing chatbot application
  • A public HTTPS URL — where you can host the event-stream server

How session scoping works

This is the most important concept in this guide. Read it before writing any code.
The real-time knowledge base only works correctly if the events Dify retrieves belong to this user’s current session — not a pool of all users’ events mixed together. Here’s what has to happen:
  • Buffer events per session. Every chunk written to the “live activity” knowledge base must be keyed by a stable identifier. The Autoplay payload makes this easy: every actions event — and every individual action object inside it — carries session_id and email at the top level (see the Payload schema). Your server reads whichever field you want to key on directly from the incoming event; no extra client-side instrumentation needed.
  • Dify must identify the same session on every retrieval. Whatever identifier you pass in the retrieval request — session_id or email — must match the key your server used when storing events. If the keys don’t line up, retrieval returns the wrong bucket or nothing at all.
  • Without this link, context is empty or generic. Dify can’t pick the right bucket if it doesn’t know which session to ask for.
The practical rule: pick one key (session_id or email), store with it, retrieve with it. You don’t need extra categories or taxonomies beyond session boundaries — the session boundary is the distinction between users.

Step 1 — Set up the event-stream server

The server consumes Autoplay’s SSE stream using AsyncConnectorClient, buffers actions with AsyncAgentContextWriter, and exposes an HTTP endpoint that Dify’s retriever polls. Dify expects an endpoint returning a JSON array of text chunks — the server below handles exactly that, with each user’s events stored in a separate per-session buffer.

Install the SDK

Install dependencies
pip install autoplay-sdk fastapi uvicorn httpx openai

Server code

Create server.py. The server maintains a session-keyed in-memory store and serves the right session’s records at GET /events. Because every actions payload from Autoplay already carries session_id and email on the batch envelope (and on each action object), the server can key its store directly from the incoming event — no client-side plumbing needed. The /events endpoint accepts either session_id or email as a query param, so Dify can use whichever identifier is easiest to pass from your frontend.
server.py
# server.py — Autoplay → Dify real-time knowledge server (session-scoped)
import asyncio, os
from collections import defaultdict
from datetime import datetime, timezone

import openai
from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import JSONResponse

from autoplay_sdk import AsyncConnectorClient, AsyncSessionSummarizer
from autoplay_sdk.agent_context import AsyncAgentContextWriter

# ── Config ───────────────────────────────────────────────────────────
CONNECTOR_URL = os.environ["AUTOPLAY_STREAM_URL"]   # e.g. https://…/stream/<product_id>
API_TOKEN     = os.environ["AUTOPLAY_API_TOKEN"]
MAX_CHUNKS    = 50   # per session — keeps memory bounded

app = FastAPI()
async_openai = openai.AsyncOpenAI()

# ── In-memory stores (session-scoped) ────────────────────────────────
# Primary store keyed by session_id (from payload.session_id)
# { session_id: [{"text": "…", "ts": "…"}, …] }
chunks_by_session: dict[str, list[dict]] = defaultdict(list)

# Secondary index: email → session_id (from payload.email + payload.session_id)
# Lets Dify look up by email when session_id isn't available on the frontend.
email_to_session: dict[str, str] = {}


def _store(session_id: str, email: str | None, text: str) -> None:
    entry = {"text": text, "ts": datetime.now(timezone.utc).isoformat()}
    buf = chunks_by_session[session_id]
    buf.append(entry)
    if len(buf) > MAX_CHUNKS:
        buf.pop(0)
    # Keep the email → session_id index fresh (last-writer-wins per email)
    if email:
        email_to_session[email] = session_id


# ── Callbacks ────────────────────────────────────────────────────────
# AsyncAgentContextWriter passes (session_id, text) — both come from the payload.
async def write_actions(session_id: str, text: str) -> None:
    # session_id is already extracted from payload.session_id by the SDK.
    # We resolve email from the index if needed; for storage we key by session_id.
    _store(session_id, email=None, text=text)


async def on_raw_payload(payload: dict) -> None:
    """
    Hook into the raw payload to capture email → session_id mapping.
    payload.session_id and payload.email come straight from the actions event.
    """
    sid   = payload.get("session_id")
    email = payload.get("email")
    if sid and email:
        email_to_session[email] = sid


async def overwrite_with_summary(session_id: str, summary: str) -> None:
    # Replace all chunks for this session with a single compact summary.
    chunks_by_session[session_id] = [{
        "text": summary,
        "ts": datetime.now(timezone.utc).isoformat(),
    }]


# ── LLM summarizer ───────────────────────────────────────────────────
async def llm(prompt: str) -> str:
    r = await async_openai.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3, max_tokens=256,
    )
    return r.choices[0].message.content


summarizer   = AsyncSessionSummarizer(llm=llm, threshold=20)
agent_writer = AsyncAgentContextWriter(
    summarizer=summarizer,
    write_actions=write_actions,
    overwrite_with_summary=overwrite_with_summary,
    debounce_ms=0,
)


# ── HTTP endpoint Dify will call ──────────────────────────────────────
@app.get("/events")
async def get_events(
    session_id: str | None = Query(None, description="PostHog session_id (preferred)"),
    email:      str | None = Query(None, description="User email — resolved to session_id server-side"),
):
    """
    Returns the requesting session's event chunks.

    Pass exactly one of:
      ?session_id=ps_abc123          — direct lookup, fastest
      ?email=user@example.com        — resolved via email_to_session index

    Dify's External Knowledge API connector expects:
      { "records": [{"content": "…", "score": 1.0, "title": "…"}, …] }
    """
    if not session_id and not email:
        raise HTTPException(status_code=400, detail="Provide session_id or email")

    sid = session_id
    if not sid and email:
        sid = email_to_session.get(email)

    if not sid:
        # No events received yet for this identifier — return empty gracefully
        return JSONResponse({"records": []})

    source = chunks_by_session.get(sid, [])
    records = [
        {"content": c["text"], "score": 1.0, "title": f"events @ {c['ts']}"}
        for c in reversed(source)   # newest first
    ]
    return JSONResponse({"records": records})


# ── Background stream task ────────────────────────────────────────────
@app.on_event("startup")
async def start_stream():
    asyncio.create_task(_run_stream())


async def _run_stream():
    async with AsyncConnectorClient(url=CONNECTOR_URL, token=API_TOKEN) as client:
        client.on_actions(agent_writer.add)
        # Also hook raw payloads to keep the email → session_id index up to date
        client.on_raw(on_raw_payload)
        await client.run()
session_id and email are delivered to your server automatically — they’re top-level fields on every actions payload (see Payload schema). You don’t need to instrument your frontend to forward them; the Autoplay connector already includes them.

Which identifier to use

IdentifierWhen to use
session_idPreferred. Passes directly from payload.session_id. Maps 1-to-1 to a browser session — the right granularity for “where is this user right now.”
emailUse when your Dify frontend knows the logged-in user’s email but not their PostHog session. The server resolves email → session_id from the index built as events arrive. Make sure events have arrived before the first retrieval, or the index won’t have the mapping yet.

How identifiers flow end-to-end

Autoplay stream                  Event server                        Dify
───────────────                  ────────────                        ────
actions payload          ──→    chunks_by_session[session_id]  ←──  GET /events?session_id=ps_abc123
  .session_id = ps_abc123        email_to_session[email]        or   GET /events?email=user@example.com
  .email = user@example.com      (index built automatically)         (retrieval step, per chat turn)
  .actions[*].session_id
  .actions[*].email

Deploy & verify

Run locally with uvicorn server:app --port 8000 and expose it with ngrok http 8000 for testing. For production, deploy anywhere that can serve a public HTTPS endpoint — Render, Railway, Fly.io, a VPS, or your own infrastructure.
Verify /events locally or against production
# Verify by session_id (preferred)
curl "https://your-server.example.com/events?session_id=ps_abc123"
# → {"records": [{"content": "…", "score": 1.0, "title": "…"}]}

# Verify by email
curl "https://your-server.example.com/events?email=user@example.com"
# → {"records": [...]}

# Unknown identifier returns empty records (correct — no events yet)
curl "https://your-server.example.com/events?session_id=unknown"
# → {"records": []}
Don’t want to self-host? Join the Autoplay Slack workspace and post in #just-integrated — we can host the event server for you and hand you a ready-to-use URL to drop straight into Dify.

Step 2 — Create a Knowledge Base in Dify

Dify’s Knowledge feature supports an External Knowledge API mode, which lets you point it at any HTTPS endpoint returning the records format above. Dify re-queries your live server — rather than a static document index — on every retrieval step.

Create the dataset

Navigate to Knowledge → Create Knowledge → External Knowledge API in the Dify sidebar.
FieldValue
NameGive it a clear name — e.g. Real-Time Events
Data sourceSelect External Knowledge API (not “Upload file” or “Sync from Notion”)
API EndpointYour deployed server URL — e.g. https://your-server.example.com/events
API KeyOptional. If you add auth to /events, set the Bearer token here. Otherwise leave blank.
Retrieval settingLeave as Semantic search (Dify handles embedding; your server just returns chunks)

Pass the session identifier to Dify

Dify forwards query parameters you configure in the retrieval settings to your /events endpoint. You need to pass one of session_id or email so the server can open the right bucket. Option A — pass session_id (preferred) session_id is the PostHog session ID. It’s already in every payload the Autoplay stream delivers, so your server has it from the moment the first event arrives. Pass it from your frontend when you open a Dify conversation:
Dify chat-messages (session_id)
// When opening or continuing a chat, pass the PostHog session_id as an input
const response = await fetch("https://api.dify.ai/v1/chat-messages", {
  method: "POST",
  headers: {
    "Authorization": `Bearer ${DIFY_APP_KEY}`,
    "Content-Type": "application/json",
  },
  body: JSON.stringify({
    inputs: {
      session_id: posthog.get_session_id(),   // stable for this browser session
    },
    query: userMessage,
    conversation_id: existingConversationId,  // null to start new
    user: currentUserId,
  }),
});
In the External Knowledge API settings in Dify, map the query parameter session_id to (Dify substitutes this from your chat inputs). Option B — pass email If your frontend knows the logged-in user’s email but not their PostHog session ID, pass email instead. The server builds an email → session_id index automatically from incoming payloads (both fields are present on every actions event), so no extra backend work is needed.
Dify chat-messages (email)
// When opening or continuing a chat, pass email as an input (same shape as Option A)
const response = await fetch("https://api.dify.ai/v1/chat-messages", {
  method: "POST",
  headers: {
    "Authorization": `Bearer ${DIFY_APP_KEY}`,
    "Content-Type": "application/json",
  },
  body: JSON.stringify({
    inputs: {
      email: currentUser.email,   // e.g. "user@example.com"
    },
    query: userMessage,
    conversation_id: existingConversationId,  // null to start new
    user: currentUserId,
  }),
});
In the External Knowledge API settings, map email to .
If you use email, make sure at least one event has arrived from that user before the first retrieval — otherwise the email → session_id index is empty and the endpoint returns no records. session_id doesn’t have this race condition because the store key is written on the first event.

Connect your URL

  1. Click New External Knowledge API.
  2. Enter a descriptive name (e.g. autoplay-realtime) and paste in your endpoint URL.
  3. Click Save. Dify will make a test request — confirm the response shows "records".
  4. Back in the dataset creation flow, select your new API connection and click Create.
Dify re-queries the endpoint on each retrieval. There is no sync schedule to configure — events are always fresh. The score field in your response lets you rank chunks; returning 1.0 for all ensures the most recent entries surface first.

Step 3 — Update the system prompt

Open your Dify chatbot application (Studio → your app → Orchestrate), attach the Real-Time Events knowledge base to the app, then update the system prompt so the model knows how to use the retrieved context.

Attach the knowledge base to your app

  1. In the Orchestrate view, click the Context panel on the right.
  2. Click Add and select Real-Time Events.
  3. Set Top K to 5 and Score threshold to 0.5 (adjust to taste).

System prompt

Replace (or append to) the existing system prompt with something like the example below. The key section is How to use the “Current User Activity” record — it tells the model exactly when and how to surface real-time context. The placeholder in the example below is automatically injected by Dify with the retrieved chunks for this session.
Example system prompt
You are a friendly and helpful assistant for users of this product.

Focus on helping people find their way in the UI, complete workflows, and
understand features. Assume some users are seeing the product for the first time.

## How to use the "Current User Activity" record

You may receive a special record titled "Current User Activity" in the
retrieved context. This shows what THIS user has been doing on the
platform in the last 2 minutes — which page they are on and what they
clicked. The activity is scoped to their session, so it reflects only
their actions, not anyone else's.

{{#context#}}

When this record is present:

1. **Acknowledge their activity naturally** — for example:
   "I can see you're currently on the Projects page" or
   "It looks like you've been exploring the Dashboard."

2. **Use it to give specific directions** — instead of generic
   instructions, reference where they are:
   "From the page you're on, click the blue 'Add Project' button
   at the top right."

3. **Detect if they might be lost** — if their actions show them
   clicking around without a clear pattern, gently offer help:
   "It looks like you might be looking for something specific.
   Can I help you find it?"

4. **Don't force it** — if the user's question has nothing to do
   with their current activity, just answer the question normally.
   Don't mention their activity unless it's helpful.

## How to answer questions

- **Be specific**: reference actual button names, tab labels, and
  menu items from the knowledge base.
- **Use numbered steps**: when explaining how to do something,
  always use a numbered list.
- **Keep it simple**: avoid technical jargon. Explain as if the
  user has never used the platform before.
- **Be encouraging**: use phrases like "Great question!" or
  "That's easy to do" to make users feel comfortable.
- **Offer next steps**: after answering, suggest what they might
  want to do next.
- **Admit when you don't know**: if the knowledge base doesn't
  have the answer, say so honestly.

## Language

Respond in the same language the user writes in.

## Examples of good responses

User is on the Dashboard, asks "How do I create a project?":
  "I can see you're currently on the Dashboard. To create a new
   project:
   1. Click on 'My Projects' in the left sidebar
   2. Click the 'Add Project' button at the top right
   3. Choose the type, template, or options that match what you're creating
   4. Fill in the required details and click 'Create'
   Would you like me to explain what each field means?"

User is on the Invoice page, asks "Where are settings?":
  "The settings aren't on this page — you can find them by clicking
   on your profile icon in the top right corner, then selecting
   'Settings' from the dropdown menu."

User has no activity context, asks "What can I do here?":
  "Welcome! Here's what you can do:
   1. Dashboard — see an overview of your work
   2. My Projects — create and manage projects
   3. Reports — view analytics or exports
   4. Billing — manage invoices or account settings
   What would you like to explore first?"
Testing it: Open the chatbot’s Preview panel and trigger some PostHog events in your app. Ask a normal help question like “How do I create a project?” — the bot should respond with directions specific to the page you’re currently on, not a generic walkthrough.

Step 4 — Keep context compact with background summarization

Every time a user asks a question, Dify pulls their session’s recent activity from your /events endpoint and injects it into the prompt. That context is what makes the answer specific and useful. But if a user has been active for a while, raw event chunks accumulate fast — a wall of unprocessed clicks will bloat the context window, drive up costs, and drown the useful signal in noise. The solution is to continuously compress each session’s history in the background, before a user ever asks anything, so Dify always retrieves a tight, meaningful summary rather than a raw event log.

Why this matters

[1] Clicked Dashboard link
[2] Clicked Projects tab
[3] Clicked Add Project button
[4] Clicked product type option
[5] Clicked back button
[6] Clicked Projects tab
[7] Clicked Add Project button
… 40 more lines …

→ ~800 tokens of context noise

How it works

Step 1’s server already wires this up — no extra module needed.
  • After threshold actions per session (default 20), the summarizer runs in the background.
  • It calls your llm to collapse that session’s chunks into one summary, then overwrite_with_summary replaces only that session’s stored chunks with the summary.
  • The next GET /events?session_id=<id> returns the compact summary instead of raw clicks — for that session only. Other sessions are unaffected.
Knobs
ParameterEffect
thresholdLower = summarize sooner (more churn); higher = keep raw detail longer
debounce_msDelay after the last action before writing a batch — 3000 merges rapid-fire clicks
llmAny async prompt → string callable; gpt-4o-mini is a typical low-cost choice
Adjust AsyncSessionSummarizer / AsyncAgentContextWriter in server.py:
server.py (summarizer knobs)
# Summarize after every 10 actions instead of 20,
# and wait 3 s after the last action before writing a chunk.
summarizer = AsyncSessionSummarizer(llm=llm, threshold=10)

agent_writer = AsyncAgentContextWriter(
    summarizer=summarizer,
    write_actions=write_actions,
    overwrite_with_summary=overwrite_with_summary,
    debounce_ms=3000,   # bin rapid clicks into ~3 s windows
)
The summarizer runs on your server, not inside Dify. It fires between events arriving and Dify querying — so by the time the user asks a question, the context is already compact and already session-scoped. Dify never sees the raw flood of clicks; it only ever retrieves the most recent meaningful summary for the right user.

Troubleshooting session scoping

SymptomLikely causeFix
Bot gives generic answers despite user being activeIdentifier not reaching /events — Dify is calling without session_id or emailCheck that your chosen identifier is in Dify’s inputs and wired as a query param in the External KB settings
Bot sees another user’s activityEvents stored under wrong key, or session_id not refreshed between usersEnsure you call posthog.get_session_id() fresh per conversation start, not cached from a previous session
Records always empty (email mode)email → session_id index not populated yet — first retrieval raced ahead of first eventSwitch to session_id mode, or add a short delay between page load and chat open so at least one event arrives first
Records always empty (session_id mode)session_id in retrieval doesn’t match the key written by write_actionsLog payload.session_id in on_raw_payload and compare to what Dify sends — they must be identical strings
Context grows stale mid-sessionPostHog rotates session_id (e.g. after 30 min idle)Listen for PostHog’s session change event and re-send the new session_id to Dify’s conversation inputs; or switch to email mode which is session-rotation-safe