MemoryKit

Python SDK

Official Python SDK for MemoryKit. Sync and async clients, fully typed.

Sync + Async

MemoryKit for scripts, AsyncMemoryKit for FastAPI and Django.

Typed errors

Catch AuthenticationError, RateLimitError, etc. — no string matching.

Python 3.8+

Uses httpx under the hood. One dependency, fully typed with py.typed.

Installation

pip install memorykit

Requires Python 3.8+. httpx is installed automatically.

Quick start

Install and initialize

from memorykit import MemoryKit
 
mk = MemoryKit(api_key="ctx_...")

Store a memory

memory = mk.memories.create(
    content="User prefers dark mode and metric units.",
    tags=["preferences"],
    user_id="user_123",
)
results = mk.memories.search(
    query="What does the user prefer?",
    limit=5,
)
for hit in results.results:
    print(f"[{hit.score:.2f}] {hit.content}")

Configuration options

mk = MemoryKit(
    api_key="ctx_...",
    base_url="https://api.memorykit.io/v1",  # default
    timeout=30,        # seconds
    max_retries=3,     # automatic retries on 429/5xx
)

The client supports context managers for clean resource cleanup:

with MemoryKit(api_key="ctx_...") as mk:
    mk.memories.create(content="Hello")
# httpx connection pool is closed automatically

Sync vs Async — The default MemoryKit client is synchronous. For async frameworks like FastAPI, use AsyncMemoryKit (see below). Mixing sync calls inside async code will block the event loop.

Memories

Create and manage

# Create
memory = mk.memories.create(
    content="Meeting notes from Q4 planning...",
    title="Q4 Planning",
    tags=["meetings", "q4"],
    user_id="user_123",
)
 
# List with pagination
page = mk.memories.list(limit=20, status="completed")
print(len(page.data), "memories")
if page.has_more:
    next_page = mk.memories.list(cursor=page.cursor)
 
# Get by ID
mem = mk.memories.get("mem_abc123")
 
# Update
mk.memories.update("mem_abc123", title="Updated Title")
 
# Delete
mk.memories.delete("mem_abc123")

Upload files

Supports PDF, DOCX, XLSX, PPTX, TXT, CSV, Markdown, HTML, and JSON.

memory = mk.memories.upload(
    file=open("report.pdf", "rb"),
    title="Q4 Report",
    user_id="user_123",
)

Batch ingest

Up to 100 memories in a single request.

result = mk.memories.batch_create(items=[
    {"content": "First fact", "tags": ["facts"]},
    {"content": "Second fact", "tags": ["facts"]},
])
print(f"{result.accepted} accepted, {result.rejected} rejected")

Reprocess

Triggers re-chunking and re-embedding for an existing memory.

mk.memories.reprocess("mem_abc123")

Search

Combines vector similarity, full-text search, and reranking.

results = mk.memories.search(
    query="quarterly revenue targets",
    limit=10,
    tags="finance",
    precision="high",
)
for hit in results.results:
    print(f"[{hit.score:.2f}] {hit.title}")

RAG query

Coming Soonmemories.query() is not available in the current SDK release. Use memories.search() for retrieval. LLM-powered query with answer generation will be available in a future release.

Streaming

Coming Soonmemories.stream() is not available in the current SDK release. Use memories.search() for retrieval. Streaming responses will be available in a future release.

Chats

Coming Soon — Chat methods (chats.create(), chats.send_message(), chats.stream_message(), chats.get_history()) are not available in the current SDK release. Conversational RAG with chat sessions will be available in a future release.

Users

# Upsert (create or update)
mk.users.upsert(
    id="user_123",
    name="Alice",
    email="alice@example.com",
    metadata={"plan": "pro"},
)
 
# Track events
mk.users.create_event(
    "user_123",
    type="page_view",
    data={"page": "/settings"},
)
 
# GDPR erasure — delete user and all associated data
mk.users.delete("user_123", cascade=True)

Webhooks

# Subscribe to events
webhook = mk.webhooks.create(
    url="https://example.com/webhook",
    events=["memory.completed", "memory.failed"],
)
print("Signing secret:", webhook.secret)
 
# Test delivery
test = mk.webhooks.test(webhook.id)
print("Delivered:", test.success)

Error handling

All errors are typed — catch specific exception classes:

from memorykit import (
    MemoryKitError,
    AuthenticationError,
    RateLimitError,
    NotFoundError,
    ValidationError,
)
 
try:
    result = mk.memories.search(query="...")
except AuthenticationError:
    # 401 — invalid or expired API key
    pass
except RateLimitError:
    # 429 — automatically retried, but raised if max retries exceeded
    pass
except ValidationError as e:
    # 400/422 — bad request parameters
    print(e)
except NotFoundError:
    # 404
    pass
except MemoryKitError as e:
    # catch-all for other API errors
    print(e.status_code, e)

Retryable errors (429, 5xx) are automatically retried with exponential backoff up to max_retries times. You don't need to implement retry logic yourself.

Async usage

For async frameworks, swap MemoryKit for AsyncMemoryKit. Every method becomes await-able:

from memorykit import AsyncMemoryKit
 
async with AsyncMemoryKit(api_key="ctx_...") as mk:
    # All methods are async
    memory = await mk.memories.create(content="User prefers dark mode.")
 
    results = await mk.memories.search(
        query="What does the user prefer?",
        limit=5,
    )
    for hit in results.results:
        print(f"[{hit.score:.2f}] {hit.content}")

FastAPI example

from fastapi import FastAPI
from memorykit import AsyncMemoryKit
 
app = FastAPI()
mk = AsyncMemoryKit(api_key="ctx_...")
 
@app.post("/search")
async def search(query: str):
    results = await mk.memories.search(query=query, limit=10)
    return {"results": [{"content": r.content, "score": r.score} for r in results.results]}
Edit on GitHub

On this page