Integration Guide
This guide covers everything you need to integrate Memory OS into your application, from architecture decisions to production deployment.
Architecture Patterns
When integrating Memory OS, choose an architecture pattern that fits your application's needs.
Centralized Memory Service
A centralized memory service acts as a single gateway to Memory OS for all your application components.
Single point of access: All application components route through the Memory Service.
Best for:
- Applications with multiple components needing memory access
- When you need centralized caching and connection pooling
- Easier monitoring and debugging
JavaScript
// services/memory.js
import { MemoryOS } from '@memory-os/sdk';
class MemoryService {
constructor() {
this.client = new MemoryOS({
apiKey: process.env.MEMORY_OS_API_KEY,
baseUrl: process.env.MEMORY_OS_BASE_URL || 'https://api.mymemoryos.com/v1'
});
this.cache = new Map();
this.cacheTTL = 60000; // 1 minute
}
async storeMemory(content, options = {}) {
const memory = await this.client.memories.create({
content,
tier: options.tier || 'short',
content_type: options.contentType || 'text',
memory_nature: options.nature || 'episodic',
metadata: options.metadata || {}
});
// Invalidate relevant cache entries
this.invalidateCache(options.metadata?.user_id);
return memory;
}
async getContext(query, options = {}) {
const cacheKey = this.getCacheKey(query, options);
const cached = this.cache.get(cacheKey);
if (cached && Date.now() - cached.timestamp < this.cacheTTL) {
return cached.data;
}
const context = await this.client.getContext({
query,
max_tokens: options.maxTokens || 2000,
tier: options.tier,
format: options.format || 'text'
});
this.cache.set(cacheKey, {
data: context,
timestamp: Date.now()
});
return context;
}
async search(query, options = {}) {
return this.client.search({
query,
threshold: options.threshold || 0.7,
limit: options.limit || 20,
tier: options.tier,
memory_nature: options.nature
});
}
getCacheKey(query, options) {
return JSON.stringify({ query, ...options });
}
invalidateCache(userId) {
if (userId) {
for (const [key] of this.cache) {
if (key.includes(userId)) {
this.cache.delete(key);
}
}
}
}
}
// Export singleton instance
export const memoryService = new MemoryService();Python
# services/memory.py
import os
import time
import json
from typing import Optional, Dict, Any
from memoryos import MemoryOS
class MemoryService:
def __init__(self):
self.client = MemoryOS(
api_key=os.environ["MEMORY_OS_API_KEY"],
base_url=os.environ.get("MEMORY_OS_BASE_URL", "https://api.mymemoryos.com/v1")
)
self._cache: Dict[str, Dict[str, Any]] = {}
self._cache_ttl = 60 # 1 minute in seconds
def store_memory(
self,
content: str,
tier: str = "short",
content_type: str = "text",
nature: str = "episodic",
metadata: Optional[Dict] = None
):
memory = self.client.memories.create(
content=content,
tier=tier,
content_type=content_type,
memory_nature=nature,
metadata=metadata or {}
)
# Invalidate relevant cache entries
if metadata and "user_id" in metadata:
self._invalidate_cache(metadata["user_id"])
return memory
def get_context(
self,
query: str,
max_tokens: int = 2000,
tier: Optional[str] = None,
format: str = "text"
):
cache_key = self._get_cache_key(query, max_tokens=max_tokens, tier=tier)
cached = self._cache.get(cache_key)
if cached and time.time() - cached["timestamp"] < self._cache_ttl:
return cached["data"]
context = self.client.get_context(
query=query,
max_tokens=max_tokens,
tier=tier,
format=format
)
self._cache[cache_key] = {
"data": context,
"timestamp": time.time()
}
return context
def search(
self,
query: str,
threshold: float = 0.7,
limit: int = 20,
tier: Optional[str] = None,
nature: Optional[str] = None
):
return self.client.search(
query=query,
threshold=threshold,
limit=limit,
tier=tier,
memory_nature=nature
)
def _get_cache_key(self, query: str, **kwargs) -> str:
return json.dumps({"query": query, **kwargs}, sort_keys=True)
def _invalidate_cache(self, user_id: str):
keys_to_delete = [k for k in self._cache if user_id in k]
for key in keys_to_delete:
del self._cache[key]
# Module-level singleton
memory_service = MemoryService()Distributed Memory (Per-User Isolation)
For multi-tenant applications, isolate memories by user or organization.
JavaScript
// services/user-memory.js
import { MemoryOS } from '@memory-os/sdk';
class UserMemoryContext {
constructor(client, userId) {
this.client = client;
this.userId = userId;
}
async store(content, options = {}) {
return this.client.memories.create({
content,
tier: options.tier || 'short',
content_type: options.contentType || 'text',
memory_nature: options.nature || 'episodic',
metadata: {
...options.metadata,
user_id: this.userId
}
});
}
async getContext(query, options = {}) {
// Search with user filter is handled by tenant isolation
// Additional user_id filtering in metadata
return this.client.getContext({
query,
max_tokens: options.maxTokens || 2000,
tier: options.tier
});
}
async search(query, options = {}) {
return this.client.search({
query,
threshold: options.threshold || 0.7,
limit: options.limit || 20,
tier: options.tier
});
}
async listMemories(options = {}) {
return this.client.memories.list({
limit: options.limit || 50,
offset: options.offset || 0,
tier: options.tier
});
}
}
class MemoryClientFactory {
constructor() {
this.contexts = new Map();
this.client = new MemoryOS({
apiKey: process.env.MEMORY_OS_API_KEY
});
}
forUser(userId) {
if (!this.contexts.has(userId)) {
this.contexts.set(userId, new UserMemoryContext(this.client, userId));
}
return this.contexts.get(userId);
}
// Clean up contexts for inactive users
cleanup(inactiveThreshold = 3600000) { // 1 hour
const now = Date.now();
for (const [userId, context] of this.contexts) {
if (context.lastAccess && now - context.lastAccess > inactiveThreshold) {
this.contexts.delete(userId);
}
}
}
}
export const memoryFactory = new MemoryClientFactory();
// Usage example
// const userMemory = memoryFactory.forUser('user_123');
// await userMemory.store('User prefers dark mode');Python
# services/user_memory.py
import os
import time
from typing import Optional, Dict, Any
from memoryos import MemoryOS
class UserMemoryContext:
def __init__(self, client: MemoryOS, user_id: str):
self.client = client
self.user_id = user_id
self.last_access = time.time()
def store(
self,
content: str,
tier: str = "short",
content_type: str = "text",
nature: str = "episodic",
metadata: Optional[Dict] = None
):
self.last_access = time.time()
return self.client.memories.create(
content=content,
tier=tier,
content_type=content_type,
memory_nature=nature,
metadata={
**(metadata or {}),
"user_id": self.user_id
}
)
def get_context(
self,
query: str,
max_tokens: int = 2000,
tier: Optional[str] = None
):
self.last_access = time.time()
return self.client.get_context(
query=query,
max_tokens=max_tokens,
tier=tier
)
def search(
self,
query: str,
threshold: float = 0.7,
limit: int = 20,
tier: Optional[str] = None
):
self.last_access = time.time()
return self.client.search(
query=query,
threshold=threshold,
limit=limit,
tier=tier
)
def list_memories(self, limit: int = 50, offset: int = 0, tier: Optional[str] = None):
self.last_access = time.time()
return self.client.memories.list(
limit=limit,
offset=offset,
tier=tier
)
class MemoryClientFactory:
def __init__(self):
self._contexts: Dict[str, UserMemoryContext] = {}
self._client = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])
def for_user(self, user_id: str) -> UserMemoryContext:
if user_id not in self._contexts:
self._contexts[user_id] = UserMemoryContext(self._client, user_id)
return self._contexts[user_id]
def cleanup(self, inactive_threshold: int = 3600):
"""Remove contexts for inactive users."""
now = time.time()
inactive_users = [
user_id for user_id, context in self._contexts.items()
if now - context.last_access > inactive_threshold
]
for user_id in inactive_users:
del self._contexts[user_id]
memory_factory = MemoryClientFactory()
# Usage example
# user_memory = memory_factory.for_user("user_123")
# user_memory.store("User prefers dark mode")Framework Integration Examples
Express.js Integration
JavaScript
// app.js
import express from 'express';
import { MemoryOS } from '@memory-os/sdk';
const app = express();
app.use(express.json());
// Initialize Memory OS client
const memory = new MemoryOS({
apiKey: process.env.MEMORY_OS_API_KEY
});
// Middleware to attach memory client to request
app.use((req, res, next) => {
req.memory = memory;
next();
});
// Store a memory
app.post('/memories', async (req, res) => {
try {
const { content, tier, contentType, metadata } = req.body;
const userId = req.headers['x-user-id'];
const result = await req.memory.memories.create({
content,
tier: tier || 'short',
content_type: contentType || 'text',
memory_nature: 'episodic',
metadata: {
...metadata,
user_id: userId
}
});
res.json({ success: true, data: result });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Chat endpoint with memory context
app.post('/chat', async (req, res) => {
try {
const { message } = req.body;
const userId = req.headers['x-user-id'];
// Get relevant context for the conversation
const context = await req.memory.getContext({
query: message,
max_tokens: 2000
});
// Store the user's message as a memory
await req.memory.memories.create({
content: `User message: ${message}`,
tier: 'short',
content_type: 'conversation',
memory_nature: 'episodic',
metadata: { user_id: userId }
});
// Generate response using your LLM with context
const response = await generateLLMResponse(message, context.context);
// Store key insights from the response
if (response.insights) {
await req.memory.memories.create({
content: response.insights,
tier: 'medium',
content_type: 'fact',
memory_nature: 'semantic',
metadata: { user_id: userId }
});
}
res.json({
response: response.text,
context_used: context.memories.length
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Search memories
app.get('/search', async (req, res) => {
try {
const { query, threshold = 0.7, limit = 20 } = req.query;
const results = await req.memory.search({
query,
threshold: parseFloat(threshold),
limit: parseInt(limit)
});
res.json({ success: true, data: results });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});Next.js Integration
TypeScript
// lib/memory.ts
import { MemoryOS } from '@memory-os/sdk';
let memoryClient: MemoryOS | null = null;
export function getMemoryClient(): MemoryOS {
if (!memoryClient) {
memoryClient = new MemoryOS({
apiKey: process.env.MEMORY_OS_API_KEY!
});
}
return memoryClient;
}
// app/api/memories/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { getMemoryClient } from '@/lib/memory';
export async function POST(request: NextRequest) {
try {
const memory = getMemoryClient();
const body = await request.json();
const userId = request.headers.get('x-user-id');
const result = await memory.memories.create({
content: body.content,
tier: body.tier || 'short',
content_type: body.contentType || 'text',
memory_nature: body.nature || 'episodic',
metadata: {
...body.metadata,
user_id: userId
}
});
return NextResponse.json({ success: true, data: result });
} catch (error: any) {
return NextResponse.json(
{ error: error.message },
{ status: 500 }
);
}
}
export async function GET(request: NextRequest) {
try {
const memory = getMemoryClient();
const searchParams = request.nextUrl.searchParams;
const result = await memory.memories.list({
limit: parseInt(searchParams.get('limit') || '50'),
offset: parseInt(searchParams.get('offset') || '0'),
tier: searchParams.get('tier') as any
});
return NextResponse.json({ success: true, data: result });
} catch (error: any) {
return NextResponse.json(
{ error: error.message },
{ status: 500 }
);
}
}
// app/api/chat/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { getMemoryClient } from '@/lib/memory';
import OpenAI from 'openai';
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
export async function POST(request: NextRequest) {
try {
const memory = getMemoryClient();
const { message, conversationId } = await request.json();
const userId = request.headers.get('x-user-id')!;
// Get relevant context
const context = await memory.getContext({
query: message,
max_tokens: 2000
});
// Generate response with context
const completion = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: `You are a helpful assistant with memory. Here is context about the user:\n\n${context.context}`
},
{ role: 'user', content: message }
]
});
const assistantMessage = completion.choices[0].message.content;
// Store conversation turn
await memory.memories.create({
content: `User: ${message}\nAssistant: ${assistantMessage}`,
tier: 'short',
content_type: 'conversation',
memory_nature: 'episodic',
metadata: {
user_id: userId,
conversation_id: conversationId
}
});
return NextResponse.json({
response: assistantMessage,
context_count: context.memories.length
});
} catch (error: any) {
return NextResponse.json(
{ error: error.message },
{ status: 500 }
);
}
}Python Flask Integration
Python
# app.py
import os
from flask import Flask, request, jsonify
from memoryos import MemoryOS
app = Flask(__name__)
# Initialize Memory OS client
memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])
@app.route("/memories", methods=["POST"])
def create_memory():
try:
data = request.json
user_id = request.headers.get("X-User-Id")
result = memory.memories.create(
content=data["content"],
tier=data.get("tier", "short"),
content_type=data.get("content_type", "text"),
memory_nature=data.get("nature", "episodic"),
metadata={
**data.get("metadata", {}),
"user_id": user_id
}
)
return jsonify({"success": True, "data": result})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/memories", methods=["GET"])
def list_memories():
try:
limit = request.args.get("limit", 50, type=int)
offset = request.args.get("offset", 0, type=int)
tier = request.args.get("tier")
result = memory.memories.list(
limit=limit,
offset=offset,
tier=tier
)
return jsonify({"success": True, "data": result})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/search", methods=["POST"])
def search_memories():
try:
data = request.json
results = memory.search(
query=data["query"],
threshold=data.get("threshold", 0.7),
limit=data.get("limit", 20),
tier=data.get("tier")
)
return jsonify({"success": True, "data": results})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/context", methods=["POST"])
def get_context():
try:
data = request.json
context = memory.get_context(
query=data["query"],
max_tokens=data.get("max_tokens", 2000),
tier=data.get("tier"),
format=data.get("format", "text")
)
return jsonify({"success": True, "data": context})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/chat", methods=["POST"])
def chat():
try:
from openai import OpenAI
data = request.json
message = data["message"]
user_id = request.headers.get("X-User-Id")
openai_client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
# Get relevant context
context = memory.get_context(
query=message,
max_tokens=2000
)
# Generate response with context
completion = openai_client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": f"You are a helpful assistant. Context:\n\n{context['context']}"
},
{"role": "user", "content": message}
]
)
assistant_message = completion.choices[0].message.content
# Store conversation
memory.memories.create(
content=f"User: {message}\nAssistant: {assistant_message}",
tier="short",
content_type="conversation",
memory_nature="episodic",
metadata={"user_id": user_id}
)
return jsonify({
"response": assistant_message,
"context_count": len(context.get("memories", []))
})
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
app.run(debug=True, port=5000)Error Handling
Memory OS returns structured errors that you should handle gracefully.
Error Types
| Error Code | HTTP Status | Description |
|---|---|---|
VALIDATION_ERROR | 400 | Invalid request parameters |
AUTHENTICATION_ERROR | 401 | Invalid or missing API key |
FORBIDDEN | 403 | Insufficient permissions |
NOT_FOUND | 404 | Memory or resource not found |
RATE_LIMIT_EXCEEDED | 429 | Too many requests |
INTERNAL_ERROR | 500 | Server-side error |
Handling Errors
JavaScript
import { MemoryOS, MemoryOSError } from '@memory-os/sdk';
const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
async function safeStoreMemory(content, options = {}) {
try {
return await memory.memories.create({
content,
...options
});
} catch (error) {
if (error instanceof MemoryOSError) {
switch (error.code) {
case 'VALIDATION_ERROR':
console.error('Invalid input:', error.message);
throw new Error(`Invalid memory content: ${error.message}`);
case 'AUTHENTICATION_ERROR':
console.error('Auth failed - check API key');
throw new Error('Authentication failed');
case 'RATE_LIMIT_EXCEEDED':
console.warn('Rate limited, retrying after delay...');
await delay(error.retryAfter || 1000);
return safeStoreMemory(content, options);
case 'NOT_FOUND':
console.error('Resource not found:', error.message);
throw new Error('Memory not found');
default:
console.error('Memory OS error:', error.code, error.message);
throw error;
}
}
// Network or unexpected errors
console.error('Unexpected error:', error);
throw error;
}
}
function delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Retry wrapper with exponential backoff
async function withRetry(fn, maxRetries = 3) {
let lastError;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
if (error.code === 'RATE_LIMIT_EXCEEDED') {
const backoff = Math.pow(2, attempt) * 1000;
await delay(backoff);
continue;
}
// Don't retry auth or validation errors
if (['AUTHENTICATION_ERROR', 'VALIDATION_ERROR', 'FORBIDDEN'].includes(error.code)) {
throw error;
}
// Retry other errors with backoff
if (attempt < maxRetries - 1) {
await delay(Math.pow(2, attempt) * 500);
}
}
}
throw lastError;
}
// Usage
const memory = await withRetry(() =>
safeStoreMemory('User prefers dark mode', { tier: 'long' })
);Python
import time
from typing import Optional, TypeVar, Callable
from memoryos import MemoryOS
from memoryos.exceptions import MemoryOSError
memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])
T = TypeVar('T')
def safe_store_memory(content: str, **options):
try:
return memory.memories.create(content=content, **options)
except MemoryOSError as error:
if error.code == "VALIDATION_ERROR":
print(f"Invalid input: {error.message}")
raise ValueError(f"Invalid memory content: {error.message}")
elif error.code == "AUTHENTICATION_ERROR":
print("Auth failed - check API key")
raise RuntimeError("Authentication failed")
elif error.code == "RATE_LIMIT_EXCEEDED":
print("Rate limited, retrying after delay...")
time.sleep(error.retry_after or 1)
return safe_store_memory(content, **options)
elif error.code == "NOT_FOUND":
print(f"Resource not found: {error.message}")
raise KeyError("Memory not found")
else:
print(f"Memory OS error: {error.code} - {error.message}")
raise
except Exception as error:
print(f"Unexpected error: {error}")
raise
def with_retry(
fn: Callable[[], T],
max_retries: int = 3
) -> T:
"""Retry wrapper with exponential backoff."""
last_error = None
for attempt in range(max_retries):
try:
return fn()
except MemoryOSError as error:
last_error = error
if error.code == "RATE_LIMIT_EXCEEDED":
backoff = (2 ** attempt) * 1
time.sleep(backoff)
continue
# Don't retry auth or validation errors
if error.code in ["AUTHENTICATION_ERROR", "VALIDATION_ERROR", "FORBIDDEN"]:
raise
# Retry other errors with backoff
if attempt < max_retries - 1:
time.sleep((2 ** attempt) * 0.5)
except Exception as error:
last_error = error
if attempt < max_retries - 1:
time.sleep((2 ** attempt) * 0.5)
raise last_error
# Usage
memory_result = with_retry(
lambda: safe_store_memory("User prefers dark mode", tier="long")
)Testing with Memory OS
Unit Testing with Mocks
JavaScript
// __tests__/memory.test.js
import { jest } from '@jest/globals';
import { MemoryOS } from '@memory-os/sdk';
// Mock the SDK
jest.mock('@memory-os/sdk');
describe('Memory Service', () => {
let mockMemoryOS;
beforeEach(() => {
mockMemoryOS = {
memories: {
create: jest.fn(),
list: jest.fn(),
get: jest.fn(),
update: jest.fn(),
delete: jest.fn()
},
search: jest.fn(),
getContext: jest.fn()
};
MemoryOS.mockImplementation(() => mockMemoryOS);
});
test('stores memory correctly', async () => {
const mockMemory = {
id: 'mem_123',
content: 'Test memory',
tier: 'short',
created_at: new Date().toISOString()
};
mockMemoryOS.memories.create.mockResolvedValue(mockMemory);
const client = new MemoryOS({ apiKey: 'test_key' });
const result = await client.memories.create({
content: 'Test memory',
tier: 'short'
});
expect(result).toEqual(mockMemory);
expect(mockMemoryOS.memories.create).toHaveBeenCalledWith({
content: 'Test memory',
tier: 'short'
});
});
test('searches with correct parameters', async () => {
const mockResults = {
results: [
{ id: 'mem_1', content: 'Result 1', similarity: 0.9 },
{ id: 'mem_2', content: 'Result 2', similarity: 0.8 }
],
search_type: 'semantic'
};
mockMemoryOS.search.mockResolvedValue(mockResults);
const client = new MemoryOS({ apiKey: 'test_key' });
const results = await client.search({
query: 'test query',
threshold: 0.7
});
expect(results.results).toHaveLength(2);
expect(mockMemoryOS.search).toHaveBeenCalledWith({
query: 'test query',
threshold: 0.7
});
});
test('handles errors correctly', async () => {
mockMemoryOS.memories.create.mockRejectedValue({
code: 'VALIDATION_ERROR',
message: 'Content is required'
});
const client = new MemoryOS({ apiKey: 'test_key' });
await expect(
client.memories.create({ content: '' })
).rejects.toEqual({
code: 'VALIDATION_ERROR',
message: 'Content is required'
});
});
});Python
# tests/test_memory.py
import pytest
from unittest.mock import Mock, patch
from memoryos import MemoryOS
@pytest.fixture
def mock_memory_client():
with patch('memoryos.MemoryOS') as MockClient:
mock_instance = Mock()
MockClient.return_value = mock_instance
# Setup mock methods
mock_instance.memories = Mock()
mock_instance.search = Mock()
mock_instance.get_context = Mock()
yield mock_instance
class TestMemoryService:
def test_stores_memory_correctly(self, mock_memory_client):
mock_memory = {
"id": "mem_123",
"content": "Test memory",
"tier": "short",
"created_at": "2024-01-15T10:00:00Z"
}
mock_memory_client.memories.create.return_value = mock_memory
result = mock_memory_client.memories.create(
content="Test memory",
tier="short"
)
assert result == mock_memory
mock_memory_client.memories.create.assert_called_once_with(
content="Test memory",
tier="short"
)
def test_searches_with_correct_parameters(self, mock_memory_client):
mock_results = {
"results": [
{"id": "mem_1", "content": "Result 1", "similarity": 0.9},
{"id": "mem_2", "content": "Result 2", "similarity": 0.8}
],
"search_type": "semantic"
}
mock_memory_client.search.return_value = mock_results
results = mock_memory_client.search(
query="test query",
threshold=0.7
)
assert len(results["results"]) == 2
mock_memory_client.search.assert_called_once_with(
query="test query",
threshold=0.7
)
def test_handles_errors_correctly(self, mock_memory_client):
from memoryos.exceptions import MemoryOSError
mock_memory_client.memories.create.side_effect = MemoryOSError(
code="VALIDATION_ERROR",
message="Content is required"
)
with pytest.raises(MemoryOSError) as exc_info:
mock_memory_client.memories.create(content="")
assert exc_info.value.code == "VALIDATION_ERROR"Integration Testing
For integration tests, use a test tenant or test API keys:
JavaScript
// __tests__/integration/memory.integration.test.js
import { MemoryOS } from '@memory-os/sdk';
describe('Memory OS Integration', () => {
let client;
const createdMemoryIds = [];
beforeAll(() => {
// Use test API key
client = new MemoryOS({
apiKey: process.env.MEMORY_OS_TEST_API_KEY,
baseUrl: process.env.MEMORY_OS_TEST_URL || 'https://api.mymemoryos.com/v1'
});
});
afterAll(async () => {
// Clean up created memories
for (const id of createdMemoryIds) {
try {
await client.memories.delete(id);
} catch (e) {
// Ignore cleanup errors
}
}
});
test('full memory lifecycle', async () => {
// Create
const memory = await client.memories.create({
content: 'Integration test memory',
tier: 'short',
content_type: 'text',
metadata: { test: true }
});
createdMemoryIds.push(memory.id);
expect(memory.id).toBeDefined();
expect(memory.content).toBe('Integration test memory');
// Read
const fetched = await client.memories.get(memory.id);
expect(fetched.id).toBe(memory.id);
// Update
const updated = await client.memories.update(memory.id, {
importance_score: 0.8
});
expect(updated.importance_score).toBe(0.8);
// Search
const results = await client.search({
query: 'Integration test',
threshold: 0.5
});
expect(results.results.length).toBeGreaterThan(0);
// Delete
const deleted = await client.memories.delete(memory.id);
expect(deleted.deleted).toBe(true);
// Remove from cleanup list since already deleted
createdMemoryIds.pop();
});
test('context retrieval', async () => {
// Create test memories
const memory1 = await client.memories.create({
content: 'User prefers TypeScript for all projects',
tier: 'long',
content_type: 'fact'
});
createdMemoryIds.push(memory1.id);
const memory2 = await client.memories.create({
content: 'User works at a fintech startup',
tier: 'long',
content_type: 'fact'
});
createdMemoryIds.push(memory2.id);
// Wait for embeddings to be processed
await new Promise(resolve => setTimeout(resolve, 2000));
// Get context
const context = await client.getContext({
query: 'What programming language does the user prefer?',
max_tokens: 1000
});
expect(context.memories.length).toBeGreaterThan(0);
expect(context.context).toContain('TypeScript');
});
});Python
# tests/integration/test_memory_integration.py
import os
import time
import pytest
from memoryos import MemoryOS
class TestMemoryOSIntegration:
@pytest.fixture(autouse=True)
def setup(self):
self.client = MemoryOS(
api_key=os.environ["MEMORY_OS_TEST_API_KEY"],
base_url=os.environ.get(
"MEMORY_OS_TEST_URL",
"https://api.mymemoryos.com/v1"
)
)
self.created_memory_ids = []
yield
# Cleanup
for memory_id in self.created_memory_ids:
try:
self.client.memories.delete(memory_id)
except Exception:
pass
def test_full_memory_lifecycle(self):
# Create
memory = self.client.memories.create(
content="Integration test memory",
tier="short",
content_type="text",
metadata={"test": True}
)
self.created_memory_ids.append(memory["id"])
assert memory["id"] is not None
assert memory["content"] == "Integration test memory"
# Read
fetched = self.client.memories.get(memory["id"])
assert fetched["id"] == memory["id"]
# Update
updated = self.client.memories.update(
memory["id"],
importance_score=0.8
)
assert updated["importance_score"] == 0.8
# Search
results = self.client.search(
query="Integration test",
threshold=0.5
)
assert len(results["results"]) > 0
# Delete
deleted = self.client.memories.delete(memory["id"])
assert deleted["deleted"] is True
self.created_memory_ids.remove(memory["id"])
def test_context_retrieval(self):
# Create test memories
memory1 = self.client.memories.create(
content="User prefers TypeScript for all projects",
tier="long",
content_type="fact"
)
self.created_memory_ids.append(memory1["id"])
memory2 = self.client.memories.create(
content="User works at a fintech startup",
tier="long",
content_type="fact"
)
self.created_memory_ids.append(memory2["id"])
# Wait for embeddings
time.sleep(2)
# Get context
context = self.client.get_context(
query="What programming language does the user prefer?",
max_tokens=1000
)
assert len(context["memories"]) > 0
assert "TypeScript" in context["context"]Production Deployment
Environment Configuration
Bash
# .env.production
MEMORY_OS_API_KEY=mos_live_xxxxxxxxxxxx
MEMORY_OS_BASE_URL=https://api.mymemoryos.com/v1
# Optional: Override defaults
MEMORY_OS_TIMEOUT=30000
MEMORY_OS_MAX_RETRIES=3Health Checks
JavaScript
// health.js
import { MemoryOS } from '@memory-os/sdk';
const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
async function checkMemoryOSHealth() {
const startTime = Date.now();
try {
// Perform a lightweight operation
await memory.memories.list({ limit: 1 });
return {
status: 'healthy',
latency_ms: Date.now() - startTime,
timestamp: new Date().toISOString()
};
} catch (error) {
return {
status: 'unhealthy',
error: error.message,
code: error.code,
latency_ms: Date.now() - startTime,
timestamp: new Date().toISOString()
};
}
}
// Express health endpoint
app.get('/health/memory-os', async (req, res) => {
const health = await checkMemoryOSHealth();
const status = health.status === 'healthy' ? 200 : 503;
res.status(status).json(health);
});Python
# health.py
import time
from datetime import datetime
from memoryos import MemoryOS
memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])
def check_memory_os_health():
start_time = time.time()
try:
memory.memories.list(limit=1)
return {
"status": "healthy",
"latency_ms": int((time.time() - start_time) * 1000),
"timestamp": datetime.utcnow().isoformat()
}
except Exception as error:
return {
"status": "unhealthy",
"error": str(error),
"code": getattr(error, "code", None),
"latency_ms": int((time.time() - start_time) * 1000),
"timestamp": datetime.utcnow().isoformat()
}
# Flask health endpoint
@app.route("/health/memory-os")
def health_check():
health = check_memory_os_health()
status = 200 if health["status"] == "healthy" else 503
return jsonify(health), statusMonitoring and Observability
Track key metrics for production monitoring:
JavaScript
// metrics.js
import { MemoryOS } from '@memory-os/sdk';
import { Counter, Histogram } from 'prom-client';
// Define metrics
const memoryOperations = new Counter({
name: 'memory_os_operations_total',
help: 'Total Memory OS operations',
labelNames: ['operation', 'status']
});
const memoryLatency = new Histogram({
name: 'memory_os_latency_seconds',
help: 'Memory OS operation latency',
labelNames: ['operation'],
buckets: [0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5]
});
// Instrumented client wrapper
class InstrumentedMemoryClient {
constructor(apiKey) {
this.client = new MemoryOS({ apiKey });
}
async create(params) {
const timer = memoryLatency.startTimer({ operation: 'create' });
try {
const result = await this.client.memories.create(params);
memoryOperations.inc({ operation: 'create', status: 'success' });
return result;
} catch (error) {
memoryOperations.inc({ operation: 'create', status: 'error' });
throw error;
} finally {
timer();
}
}
async search(params) {
const timer = memoryLatency.startTimer({ operation: 'search' });
try {
const result = await this.client.search(params);
memoryOperations.inc({ operation: 'search', status: 'success' });
return result;
} catch (error) {
memoryOperations.inc({ operation: 'search', status: 'error' });
throw error;
} finally {
timer();
}
}
async getContext(params) {
const timer = memoryLatency.startTimer({ operation: 'context' });
try {
const result = await this.client.getContext(params);
memoryOperations.inc({ operation: 'context', status: 'success' });
return result;
} catch (error) {
memoryOperations.inc({ operation: 'context', status: 'error' });
throw error;
} finally {
timer();
}
}
}
export const memory = new InstrumentedMemoryClient(process.env.MEMORY_OS_API_KEY);