MemoryKit

Streaming

Stream RAG responses token-by-token with Server-Sent Events. Includes React hook and Next.js integration.

Coming Soon -- Streaming for query and chat endpoints (.stream(), .streamMessage()) is not available in the V1 release. Use /v1/memories/search and the .search() SDK method for retrieval-based search. Streaming RAG responses will be available in a future release.

Stream query and chat responses in real-time using Server-Sent Events (SSE). The API sends tokens as they're generated, so your users see answers appear immediately.

SSE protocol

The stream endpoint returns a standard SSE stream with typed events:

event: text
data: {"content": "Based on your documents, "}

event: text
data: {"content": "the Q4 revenue target is $2M."}

event: sources
data: [{"id": "mem_abc", "score": 0.95, "title": "Q4 Planning"}]

event: usage
data: {"tokens_used": 342}

event: done
data: {}

Event types

EventPayloadDescription
text{ content: string }A chunk of the generated answer
sourcesSource[]Retrieved sources used for the answer
usage{ tokens_used: number }Token usage for this request
done{}Stream complete
error{ message: string }An error occurred

Stream a query

for await (const event of mk.memories.stream({
  query: "What happened in our last meeting?",
  mode: "balanced",
})) {
  switch (event.event) {
    case "text":
      process.stdout.write(event.data.content);
      break;
    case "sources":
      console.log("\nSources:", event.data);
      break;
    case "done":
      console.log("\n--- Complete ---");
      break;
  }
}

Stream a chat message

Chat streaming works the same way — the response arrives token-by-token with full conversation context:

for await (const event of mk.chats.streamMessage(chat.id, {
  message: "Can you explain in more detail?",
})) {
  if (event.event === "text") {
    process.stdout.write(event.data.content);
  }
}

React hook

A reusable useMemoryStream hook for React apps. Handles SSE parsing, state management, and error handling.

import { useState, useCallback } from "react";
 
interface StreamState {
  answer: string;
  sources: any[];
  isStreaming: boolean;
  error: string | null;
}
 
export function useMemoryStream(endpoint = "/api/stream") {
  const [state, setState] = useState<StreamState>({
    answer: "",
    sources: [],
    isStreaming: false,
    error: null,
  });
 
  const stream = useCallback(async (query: string, mode = "balanced") => {
    setState({ answer: "", sources: [], isStreaming: true, error: null });
 
    try {
      const response = await fetch(endpoint, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ query, mode }),
      });
 
      const reader = response.body?.getReader();
      const decoder = new TextDecoder();
      let buffer = "";
 
      while (reader) {
        const { done, value } = await reader.read();
        if (done) break;
 
        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split("\n");
        buffer = lines.pop() || "";
 
        let currentEvent = "";
        for (const line of lines) {
          if (line.startsWith("event: ")) {
            currentEvent = line.slice(7);
          } else if (line.startsWith("data: ")) {
            const data = JSON.parse(line.slice(6));
            if (currentEvent === "text") {
              setState((prev) => ({
                ...prev,
                answer: prev.answer + data.content,
              }));
            } else if (currentEvent === "sources") {
              setState((prev) => ({ ...prev, sources: data }));
            } else if (currentEvent === "error") {
              setState((prev) => ({ ...prev, error: data.message }));
            }
          }
        }
      }
    } catch (err) {
      setState((prev) => ({
        ...prev,
        error: err instanceof Error ? err.message : "Stream failed",
      }));
    } finally {
      setState((prev) => ({ ...prev, isStreaming: false }));
    }
  }, [endpoint]);
 
  return { ...state, stream };
}

Usage in a component

function SearchPage() {
  const { answer, sources, isStreaming, error, stream } = useMemoryStream();
  const [query, setQuery] = useState("");
 
  return (
    <div>
      <form onSubmit={(e) => { e.preventDefault(); stream(query); }}>
        <input
          value={query}
          onChange={(e) => setQuery(e.target.value)}
          placeholder="Ask a question..."
        />
        <button type="submit" disabled={isStreaming}>
          {isStreaming ? "Thinking..." : "Ask"}
        </button>
      </form>
 
      {answer && <div className="prose">{answer}</div>}
 
      {sources.length > 0 && (
        <div className="mt-4">
          <h3>Sources</h3>
          {sources.map((s) => (
            <div key={s.id}>[{s.score.toFixed(2)}] {s.title}</div>
          ))}
        </div>
      )}
 
      {error && <div className="text-red-500">{error}</div>}
    </div>
  );
}

Next.js server proxy

Never expose your API key in client-side code. Use a server proxy to keep it secure.

Create a Route Handler that proxies the SSE stream:

app/api/stream/route.ts

export async function POST(req: Request) {
  const { query, mode } = await req.json();
 
  const response = await fetch("https://api.memorykit.io/v1/memories/query", {
    method: "POST",
    headers: {
      Authorization: `Bearer ${process.env.MEMORYKIT_API_KEY}`,
      "Content-Type": "application/json",
    },
    body: JSON.stringify({ query, mode, stream: true }),
  });
 
  return new Response(response.body, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

The useMemoryStream hook above uses /api/stream by default, so it works with this proxy out of the box.

Chat component

A complete chat UI with message history and streaming:

import { useState, useRef, useEffect } from "react";
 
interface Message {
  role: "user" | "assistant";
  content: string;
}
 
export function ChatUI() {
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState("");
  const [isStreaming, setIsStreaming] = useState(false);
  const bottomRef = useRef<HTMLDivElement>(null);
 
  useEffect(() => {
    bottomRef.current?.scrollIntoView({ behavior: "smooth" });
  }, [messages]);
 
  async function send(e: React.FormEvent) {
    e.preventDefault();
    if (!input.trim() || isStreaming) return;
 
    const userMsg = input;
    setInput("");
    setMessages((prev) => [
      ...prev,
      { role: "user", content: userMsg },
      { role: "assistant", content: "" },
    ]);
    setIsStreaming(true);
 
    try {
      const res = await fetch("/api/stream", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ query: userMsg, mode: "balanced" }),
      });
 
      const reader = res.body?.getReader();
      const decoder = new TextDecoder();
      let buffer = "";
      let currentEvent = "";
 
      while (reader) {
        const { done, value } = await reader.read();
        if (done) break;
 
        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split("\n");
        buffer = lines.pop() || "";
 
        for (const line of lines) {
          if (line.startsWith("event: ")) {
            currentEvent = line.slice(7);
          } else if (line.startsWith("data: ") && currentEvent === "text") {
            const { content } = JSON.parse(line.slice(6));
            if (content) {
              setMessages((prev) => {
                const updated = [...prev];
                const last = updated[updated.length - 1];
                updated[updated.length - 1] = {
                  ...last,
                  content: last.content + content,
                };
                return updated;
              });
            }
          }
        }
      }
    } catch {
      setMessages((prev) => {
        const updated = [...prev];
        updated[updated.length - 1] = {
          role: "assistant",
          content: "Something went wrong. Please try again.",
        };
        return updated;
      });
    } finally {
      setIsStreaming(false);
    }
  }
 
  return (
    <div className="flex flex-col h-[600px]">
      <div className="flex-1 overflow-y-auto p-4 space-y-4">
        {messages.map((msg, i) => (
          <div key={i} className={msg.role === "user" ? "text-right" : ""}>
            <span className="inline-block p-3 rounded-lg bg-gray-100 dark:bg-gray-800">
              {msg.content || "..."}
            </span>
          </div>
        ))}
        <div ref={bottomRef} />
      </div>
      <form onSubmit={send} className="p-4 border-t flex gap-2">
        <input
          value={input}
          onChange={(e) => setInput(e.target.value)}
          placeholder="Type a message..."
          className="flex-1 p-2 border rounded"
          disabled={isStreaming}
        />
        <button type="submit" disabled={isStreaming}>
          Send
        </button>
      </form>
    </div>
  );
}

Error handling

Add retry logic for production reliability:

async function streamWithRetry(query: string, maxRetries = 3) {
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      await stream(query);
      return;
    } catch (err) {
      if (attempt === maxRetries - 1) throw err;
      await new Promise((r) => setTimeout(r, 1000 * (attempt + 1)));
    }
  }
}
Edit on GitHub

On this page