Memory OS

Integration Guide

This guide covers everything you need to integrate Memory OS into your application, from architecture decisions to production deployment.

Architecture Patterns

When integrating Memory OS, choose an architecture pattern that fits your application's needs.

Centralized Memory Service

A centralized memory service acts as a single gateway to Memory OS for all your application components.

Single point of access: All application components route through the Memory Service.

Best for:

  • Applications with multiple components needing memory access
  • When you need centralized caching and connection pooling
  • Easier monitoring and debugging
JavaScript
// services/memory.js
import { MemoryOS } from '@memory-os/sdk';

class MemoryService {
  constructor() {
    this.client = new MemoryOS({
      apiKey: process.env.MEMORY_OS_API_KEY,
      baseUrl: process.env.MEMORY_OS_BASE_URL || 'https://api.mymemoryos.com/v1'
    });
    this.cache = new Map();
    this.cacheTTL = 60000; // 1 minute
  }

  async storeMemory(content, options = {}) {
    const memory = await this.client.memories.create({
      content,
      tier: options.tier || 'short',
      content_type: options.contentType || 'text',
      memory_nature: options.nature || 'episodic',
      metadata: options.metadata || {}
    });

    // Invalidate relevant cache entries
    this.invalidateCache(options.metadata?.user_id);

    return memory;
  }

  async getContext(query, options = {}) {
    const cacheKey = this.getCacheKey(query, options);
    const cached = this.cache.get(cacheKey);

    if (cached && Date.now() - cached.timestamp < this.cacheTTL) {
      return cached.data;
    }

    const context = await this.client.getContext({
      query,
      max_tokens: options.maxTokens || 2000,
      tier: options.tier,
      format: options.format || 'text'
    });

    this.cache.set(cacheKey, {
      data: context,
      timestamp: Date.now()
    });

    return context;
  }

  async search(query, options = {}) {
    return this.client.search({
      query,
      threshold: options.threshold || 0.7,
      limit: options.limit || 20,
      tier: options.tier,
      memory_nature: options.nature
    });
  }

  getCacheKey(query, options) {
    return JSON.stringify({ query, ...options });
  }

  invalidateCache(userId) {
    if (userId) {
      for (const [key] of this.cache) {
        if (key.includes(userId)) {
          this.cache.delete(key);
        }
      }
    }
  }
}

// Export singleton instance
export const memoryService = new MemoryService();
Python
# services/memory.py
import os
import time
import json
from typing import Optional, Dict, Any
from memoryos import MemoryOS

class MemoryService:
    def __init__(self):
        self.client = MemoryOS(
            api_key=os.environ["MEMORY_OS_API_KEY"],
            base_url=os.environ.get("MEMORY_OS_BASE_URL", "https://api.mymemoryos.com/v1")
        )
        self._cache: Dict[str, Dict[str, Any]] = {}
        self._cache_ttl = 60  # 1 minute in seconds

    def store_memory(
        self,
        content: str,
        tier: str = "short",
        content_type: str = "text",
        nature: str = "episodic",
        metadata: Optional[Dict] = None
    ):
        memory = self.client.memories.create(
            content=content,
            tier=tier,
            content_type=content_type,
            memory_nature=nature,
            metadata=metadata or {}
        )

        # Invalidate relevant cache entries
        if metadata and "user_id" in metadata:
            self._invalidate_cache(metadata["user_id"])

        return memory

    def get_context(
        self,
        query: str,
        max_tokens: int = 2000,
        tier: Optional[str] = None,
        format: str = "text"
    ):
        cache_key = self._get_cache_key(query, max_tokens=max_tokens, tier=tier)
        cached = self._cache.get(cache_key)

        if cached and time.time() - cached["timestamp"] < self._cache_ttl:
            return cached["data"]

        context = self.client.get_context(
            query=query,
            max_tokens=max_tokens,
            tier=tier,
            format=format
        )

        self._cache[cache_key] = {
            "data": context,
            "timestamp": time.time()
        }

        return context

    def search(
        self,
        query: str,
        threshold: float = 0.7,
        limit: int = 20,
        tier: Optional[str] = None,
        nature: Optional[str] = None
    ):
        return self.client.search(
            query=query,
            threshold=threshold,
            limit=limit,
            tier=tier,
            memory_nature=nature
        )

    def _get_cache_key(self, query: str, **kwargs) -> str:
        return json.dumps({"query": query, **kwargs}, sort_keys=True)

    def _invalidate_cache(self, user_id: str):
        keys_to_delete = [k for k in self._cache if user_id in k]
        for key in keys_to_delete:
            del self._cache[key]


# Module-level singleton
memory_service = MemoryService()

Distributed Memory (Per-User Isolation)

For multi-tenant applications, isolate memories by user or organization.

JavaScript
// services/user-memory.js
import { MemoryOS } from '@memory-os/sdk';

class UserMemoryContext {
  constructor(client, userId) {
    this.client = client;
    this.userId = userId;
  }

  async store(content, options = {}) {
    return this.client.memories.create({
      content,
      tier: options.tier || 'short',
      content_type: options.contentType || 'text',
      memory_nature: options.nature || 'episodic',
      metadata: {
        ...options.metadata,
        user_id: this.userId
      }
    });
  }

  async getContext(query, options = {}) {
    // Search with user filter is handled by tenant isolation
    // Additional user_id filtering in metadata
    return this.client.getContext({
      query,
      max_tokens: options.maxTokens || 2000,
      tier: options.tier
    });
  }

  async search(query, options = {}) {
    return this.client.search({
      query,
      threshold: options.threshold || 0.7,
      limit: options.limit || 20,
      tier: options.tier
    });
  }

  async listMemories(options = {}) {
    return this.client.memories.list({
      limit: options.limit || 50,
      offset: options.offset || 0,
      tier: options.tier
    });
  }
}

class MemoryClientFactory {
  constructor() {
    this.contexts = new Map();
    this.client = new MemoryOS({
      apiKey: process.env.MEMORY_OS_API_KEY
    });
  }

  forUser(userId) {
    if (!this.contexts.has(userId)) {
      this.contexts.set(userId, new UserMemoryContext(this.client, userId));
    }
    return this.contexts.get(userId);
  }

  // Clean up contexts for inactive users
  cleanup(inactiveThreshold = 3600000) { // 1 hour
    const now = Date.now();
    for (const [userId, context] of this.contexts) {
      if (context.lastAccess && now - context.lastAccess > inactiveThreshold) {
        this.contexts.delete(userId);
      }
    }
  }
}

export const memoryFactory = new MemoryClientFactory();

// Usage example
// const userMemory = memoryFactory.forUser('user_123');
// await userMemory.store('User prefers dark mode');
Python
# services/user_memory.py
import os
import time
from typing import Optional, Dict, Any
from memoryos import MemoryOS

class UserMemoryContext:
    def __init__(self, client: MemoryOS, user_id: str):
        self.client = client
        self.user_id = user_id
        self.last_access = time.time()

    def store(
        self,
        content: str,
        tier: str = "short",
        content_type: str = "text",
        nature: str = "episodic",
        metadata: Optional[Dict] = None
    ):
        self.last_access = time.time()
        return self.client.memories.create(
            content=content,
            tier=tier,
            content_type=content_type,
            memory_nature=nature,
            metadata={
                **(metadata or {}),
                "user_id": self.user_id
            }
        )

    def get_context(
        self,
        query: str,
        max_tokens: int = 2000,
        tier: Optional[str] = None
    ):
        self.last_access = time.time()
        return self.client.get_context(
            query=query,
            max_tokens=max_tokens,
            tier=tier
        )

    def search(
        self,
        query: str,
        threshold: float = 0.7,
        limit: int = 20,
        tier: Optional[str] = None
    ):
        self.last_access = time.time()
        return self.client.search(
            query=query,
            threshold=threshold,
            limit=limit,
            tier=tier
        )

    def list_memories(self, limit: int = 50, offset: int = 0, tier: Optional[str] = None):
        self.last_access = time.time()
        return self.client.memories.list(
            limit=limit,
            offset=offset,
            tier=tier
        )


class MemoryClientFactory:
    def __init__(self):
        self._contexts: Dict[str, UserMemoryContext] = {}
        self._client = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])

    def for_user(self, user_id: str) -> UserMemoryContext:
        if user_id not in self._contexts:
            self._contexts[user_id] = UserMemoryContext(self._client, user_id)
        return self._contexts[user_id]

    def cleanup(self, inactive_threshold: int = 3600):
        """Remove contexts for inactive users."""
        now = time.time()
        inactive_users = [
            user_id for user_id, context in self._contexts.items()
            if now - context.last_access > inactive_threshold
        ]
        for user_id in inactive_users:
            del self._contexts[user_id]


memory_factory = MemoryClientFactory()

# Usage example
# user_memory = memory_factory.for_user("user_123")
# user_memory.store("User prefers dark mode")

Framework Integration Examples

Express.js Integration

JavaScript
// app.js
import express from 'express';
import { MemoryOS } from '@memory-os/sdk';

const app = express();
app.use(express.json());

// Initialize Memory OS client
const memory = new MemoryOS({
  apiKey: process.env.MEMORY_OS_API_KEY
});

// Middleware to attach memory client to request
app.use((req, res, next) => {
  req.memory = memory;
  next();
});

// Store a memory
app.post('/memories', async (req, res) => {
  try {
    const { content, tier, contentType, metadata } = req.body;
    const userId = req.headers['x-user-id'];

    const result = await req.memory.memories.create({
      content,
      tier: tier || 'short',
      content_type: contentType || 'text',
      memory_nature: 'episodic',
      metadata: {
        ...metadata,
        user_id: userId
      }
    });

    res.json({ success: true, data: result });
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

// Chat endpoint with memory context
app.post('/chat', async (req, res) => {
  try {
    const { message } = req.body;
    const userId = req.headers['x-user-id'];

    // Get relevant context for the conversation
    const context = await req.memory.getContext({
      query: message,
      max_tokens: 2000
    });

    // Store the user's message as a memory
    await req.memory.memories.create({
      content: `User message: ${message}`,
      tier: 'short',
      content_type: 'conversation',
      memory_nature: 'episodic',
      metadata: { user_id: userId }
    });

    // Generate response using your LLM with context
    const response = await generateLLMResponse(message, context.context);

    // Store key insights from the response
    if (response.insights) {
      await req.memory.memories.create({
        content: response.insights,
        tier: 'medium',
        content_type: 'fact',
        memory_nature: 'semantic',
        metadata: { user_id: userId }
      });
    }

    res.json({
      response: response.text,
      context_used: context.memories.length
    });
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

// Search memories
app.get('/search', async (req, res) => {
  try {
    const { query, threshold = 0.7, limit = 20 } = req.query;

    const results = await req.memory.search({
      query,
      threshold: parseFloat(threshold),
      limit: parseInt(limit)
    });

    res.json({ success: true, data: results });
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server running on port ${PORT}`);
});

Next.js Integration

TypeScript
// lib/memory.ts
import { MemoryOS } from '@memory-os/sdk';

let memoryClient: MemoryOS | null = null;

export function getMemoryClient(): MemoryOS {
  if (!memoryClient) {
    memoryClient = new MemoryOS({
      apiKey: process.env.MEMORY_OS_API_KEY!
    });
  }
  return memoryClient;
}

// app/api/memories/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { getMemoryClient } from '@/lib/memory';

export async function POST(request: NextRequest) {
  try {
    const memory = getMemoryClient();
    const body = await request.json();
    const userId = request.headers.get('x-user-id');

    const result = await memory.memories.create({
      content: body.content,
      tier: body.tier || 'short',
      content_type: body.contentType || 'text',
      memory_nature: body.nature || 'episodic',
      metadata: {
        ...body.metadata,
        user_id: userId
      }
    });

    return NextResponse.json({ success: true, data: result });
  } catch (error: any) {
    return NextResponse.json(
      { error: error.message },
      { status: 500 }
    );
  }
}

export async function GET(request: NextRequest) {
  try {
    const memory = getMemoryClient();
    const searchParams = request.nextUrl.searchParams;

    const result = await memory.memories.list({
      limit: parseInt(searchParams.get('limit') || '50'),
      offset: parseInt(searchParams.get('offset') || '0'),
      tier: searchParams.get('tier') as any
    });

    return NextResponse.json({ success: true, data: result });
  } catch (error: any) {
    return NextResponse.json(
      { error: error.message },
      { status: 500 }
    );
  }
}

// app/api/chat/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { getMemoryClient } from '@/lib/memory';
import OpenAI from 'openai';

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

export async function POST(request: NextRequest) {
  try {
    const memory = getMemoryClient();
    const { message, conversationId } = await request.json();
    const userId = request.headers.get('x-user-id')!;

    // Get relevant context
    const context = await memory.getContext({
      query: message,
      max_tokens: 2000
    });

    // Generate response with context
    const completion = await openai.chat.completions.create({
      model: 'gpt-4',
      messages: [
        {
          role: 'system',
          content: `You are a helpful assistant with memory. Here is context about the user:\n\n${context.context}`
        },
        { role: 'user', content: message }
      ]
    });

    const assistantMessage = completion.choices[0].message.content;

    // Store conversation turn
    await memory.memories.create({
      content: `User: ${message}\nAssistant: ${assistantMessage}`,
      tier: 'short',
      content_type: 'conversation',
      memory_nature: 'episodic',
      metadata: {
        user_id: userId,
        conversation_id: conversationId
      }
    });

    return NextResponse.json({
      response: assistantMessage,
      context_count: context.memories.length
    });
  } catch (error: any) {
    return NextResponse.json(
      { error: error.message },
      { status: 500 }
    );
  }
}

Python Flask Integration

Python
# app.py
import os
from flask import Flask, request, jsonify
from memoryos import MemoryOS

app = Flask(__name__)

# Initialize Memory OS client
memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])


@app.route("/memories", methods=["POST"])
def create_memory():
    try:
        data = request.json
        user_id = request.headers.get("X-User-Id")

        result = memory.memories.create(
            content=data["content"],
            tier=data.get("tier", "short"),
            content_type=data.get("content_type", "text"),
            memory_nature=data.get("nature", "episodic"),
            metadata={
                **data.get("metadata", {}),
                "user_id": user_id
            }
        )

        return jsonify({"success": True, "data": result})
    except Exception as e:
        return jsonify({"error": str(e)}), 500


@app.route("/memories", methods=["GET"])
def list_memories():
    try:
        limit = request.args.get("limit", 50, type=int)
        offset = request.args.get("offset", 0, type=int)
        tier = request.args.get("tier")

        result = memory.memories.list(
            limit=limit,
            offset=offset,
            tier=tier
        )

        return jsonify({"success": True, "data": result})
    except Exception as e:
        return jsonify({"error": str(e)}), 500


@app.route("/search", methods=["POST"])
def search_memories():
    try:
        data = request.json

        results = memory.search(
            query=data["query"],
            threshold=data.get("threshold", 0.7),
            limit=data.get("limit", 20),
            tier=data.get("tier")
        )

        return jsonify({"success": True, "data": results})
    except Exception as e:
        return jsonify({"error": str(e)}), 500


@app.route("/context", methods=["POST"])
def get_context():
    try:
        data = request.json

        context = memory.get_context(
            query=data["query"],
            max_tokens=data.get("max_tokens", 2000),
            tier=data.get("tier"),
            format=data.get("format", "text")
        )

        return jsonify({"success": True, "data": context})
    except Exception as e:
        return jsonify({"error": str(e)}), 500


@app.route("/chat", methods=["POST"])
def chat():
    try:
        from openai import OpenAI

        data = request.json
        message = data["message"]
        user_id = request.headers.get("X-User-Id")

        openai_client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])

        # Get relevant context
        context = memory.get_context(
            query=message,
            max_tokens=2000
        )

        # Generate response with context
        completion = openai_client.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system",
                    "content": f"You are a helpful assistant. Context:\n\n{context['context']}"
                },
                {"role": "user", "content": message}
            ]
        )

        assistant_message = completion.choices[0].message.content

        # Store conversation
        memory.memories.create(
            content=f"User: {message}\nAssistant: {assistant_message}",
            tier="short",
            content_type="conversation",
            memory_nature="episodic",
            metadata={"user_id": user_id}
        )

        return jsonify({
            "response": assistant_message,
            "context_count": len(context.get("memories", []))
        })
    except Exception as e:
        return jsonify({"error": str(e)}), 500


if __name__ == "__main__":
    app.run(debug=True, port=5000)

Error Handling

Memory OS returns structured errors that you should handle gracefully.

Error Types

Error CodeHTTP StatusDescription
VALIDATION_ERROR400Invalid request parameters
AUTHENTICATION_ERROR401Invalid or missing API key
FORBIDDEN403Insufficient permissions
NOT_FOUND404Memory or resource not found
RATE_LIMIT_EXCEEDED429Too many requests
INTERNAL_ERROR500Server-side error

Handling Errors

JavaScript
import { MemoryOS, MemoryOSError } from '@memory-os/sdk';

const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });

async function safeStoreMemory(content, options = {}) {
  try {
    return await memory.memories.create({
      content,
      ...options
    });
  } catch (error) {
    if (error instanceof MemoryOSError) {
      switch (error.code) {
        case 'VALIDATION_ERROR':
          console.error('Invalid input:', error.message);
          throw new Error(`Invalid memory content: ${error.message}`);

        case 'AUTHENTICATION_ERROR':
          console.error('Auth failed - check API key');
          throw new Error('Authentication failed');

        case 'RATE_LIMIT_EXCEEDED':
          console.warn('Rate limited, retrying after delay...');
          await delay(error.retryAfter || 1000);
          return safeStoreMemory(content, options);

        case 'NOT_FOUND':
          console.error('Resource not found:', error.message);
          throw new Error('Memory not found');

        default:
          console.error('Memory OS error:', error.code, error.message);
          throw error;
      }
    }

    // Network or unexpected errors
    console.error('Unexpected error:', error);
    throw error;
  }
}

function delay(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

// Retry wrapper with exponential backoff
async function withRetry(fn, maxRetries = 3) {
  let lastError;

  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      return await fn();
    } catch (error) {
      lastError = error;

      if (error.code === 'RATE_LIMIT_EXCEEDED') {
        const backoff = Math.pow(2, attempt) * 1000;
        await delay(backoff);
        continue;
      }

      // Don't retry auth or validation errors
      if (['AUTHENTICATION_ERROR', 'VALIDATION_ERROR', 'FORBIDDEN'].includes(error.code)) {
        throw error;
      }

      // Retry other errors with backoff
      if (attempt < maxRetries - 1) {
        await delay(Math.pow(2, attempt) * 500);
      }
    }
  }

  throw lastError;
}

// Usage
const memory = await withRetry(() =>
  safeStoreMemory('User prefers dark mode', { tier: 'long' })
);
Python
import time
from typing import Optional, TypeVar, Callable
from memoryos import MemoryOS
from memoryos.exceptions import MemoryOSError

memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])

T = TypeVar('T')


def safe_store_memory(content: str, **options):
    try:
        return memory.memories.create(content=content, **options)
    except MemoryOSError as error:
        if error.code == "VALIDATION_ERROR":
            print(f"Invalid input: {error.message}")
            raise ValueError(f"Invalid memory content: {error.message}")

        elif error.code == "AUTHENTICATION_ERROR":
            print("Auth failed - check API key")
            raise RuntimeError("Authentication failed")

        elif error.code == "RATE_LIMIT_EXCEEDED":
            print("Rate limited, retrying after delay...")
            time.sleep(error.retry_after or 1)
            return safe_store_memory(content, **options)

        elif error.code == "NOT_FOUND":
            print(f"Resource not found: {error.message}")
            raise KeyError("Memory not found")

        else:
            print(f"Memory OS error: {error.code} - {error.message}")
            raise

    except Exception as error:
        print(f"Unexpected error: {error}")
        raise


def with_retry(
    fn: Callable[[], T],
    max_retries: int = 3
) -> T:
    """Retry wrapper with exponential backoff."""
    last_error = None

    for attempt in range(max_retries):
        try:
            return fn()
        except MemoryOSError as error:
            last_error = error

            if error.code == "RATE_LIMIT_EXCEEDED":
                backoff = (2 ** attempt) * 1
                time.sleep(backoff)
                continue

            # Don't retry auth or validation errors
            if error.code in ["AUTHENTICATION_ERROR", "VALIDATION_ERROR", "FORBIDDEN"]:
                raise

            # Retry other errors with backoff
            if attempt < max_retries - 1:
                time.sleep((2 ** attempt) * 0.5)
        except Exception as error:
            last_error = error
            if attempt < max_retries - 1:
                time.sleep((2 ** attempt) * 0.5)

    raise last_error


# Usage
memory_result = with_retry(
    lambda: safe_store_memory("User prefers dark mode", tier="long")
)

Testing with Memory OS

Unit Testing with Mocks

JavaScript
// __tests__/memory.test.js
import { jest } from '@jest/globals';
import { MemoryOS } from '@memory-os/sdk';

// Mock the SDK
jest.mock('@memory-os/sdk');

describe('Memory Service', () => {
  let mockMemoryOS;

  beforeEach(() => {
    mockMemoryOS = {
      memories: {
        create: jest.fn(),
        list: jest.fn(),
        get: jest.fn(),
        update: jest.fn(),
        delete: jest.fn()
      },
      search: jest.fn(),
      getContext: jest.fn()
    };

    MemoryOS.mockImplementation(() => mockMemoryOS);
  });

  test('stores memory correctly', async () => {
    const mockMemory = {
      id: 'mem_123',
      content: 'Test memory',
      tier: 'short',
      created_at: new Date().toISOString()
    };

    mockMemoryOS.memories.create.mockResolvedValue(mockMemory);

    const client = new MemoryOS({ apiKey: 'test_key' });
    const result = await client.memories.create({
      content: 'Test memory',
      tier: 'short'
    });

    expect(result).toEqual(mockMemory);
    expect(mockMemoryOS.memories.create).toHaveBeenCalledWith({
      content: 'Test memory',
      tier: 'short'
    });
  });

  test('searches with correct parameters', async () => {
    const mockResults = {
      results: [
        { id: 'mem_1', content: 'Result 1', similarity: 0.9 },
        { id: 'mem_2', content: 'Result 2', similarity: 0.8 }
      ],
      search_type: 'semantic'
    };

    mockMemoryOS.search.mockResolvedValue(mockResults);

    const client = new MemoryOS({ apiKey: 'test_key' });
    const results = await client.search({
      query: 'test query',
      threshold: 0.7
    });

    expect(results.results).toHaveLength(2);
    expect(mockMemoryOS.search).toHaveBeenCalledWith({
      query: 'test query',
      threshold: 0.7
    });
  });

  test('handles errors correctly', async () => {
    mockMemoryOS.memories.create.mockRejectedValue({
      code: 'VALIDATION_ERROR',
      message: 'Content is required'
    });

    const client = new MemoryOS({ apiKey: 'test_key' });

    await expect(
      client.memories.create({ content: '' })
    ).rejects.toEqual({
      code: 'VALIDATION_ERROR',
      message: 'Content is required'
    });
  });
});
Python
# tests/test_memory.py
import pytest
from unittest.mock import Mock, patch
from memoryos import MemoryOS


@pytest.fixture
def mock_memory_client():
    with patch('memoryos.MemoryOS') as MockClient:
        mock_instance = Mock()
        MockClient.return_value = mock_instance

        # Setup mock methods
        mock_instance.memories = Mock()
        mock_instance.search = Mock()
        mock_instance.get_context = Mock()

        yield mock_instance


class TestMemoryService:
    def test_stores_memory_correctly(self, mock_memory_client):
        mock_memory = {
            "id": "mem_123",
            "content": "Test memory",
            "tier": "short",
            "created_at": "2024-01-15T10:00:00Z"
        }

        mock_memory_client.memories.create.return_value = mock_memory

        result = mock_memory_client.memories.create(
            content="Test memory",
            tier="short"
        )

        assert result == mock_memory
        mock_memory_client.memories.create.assert_called_once_with(
            content="Test memory",
            tier="short"
        )

    def test_searches_with_correct_parameters(self, mock_memory_client):
        mock_results = {
            "results": [
                {"id": "mem_1", "content": "Result 1", "similarity": 0.9},
                {"id": "mem_2", "content": "Result 2", "similarity": 0.8}
            ],
            "search_type": "semantic"
        }

        mock_memory_client.search.return_value = mock_results

        results = mock_memory_client.search(
            query="test query",
            threshold=0.7
        )

        assert len(results["results"]) == 2
        mock_memory_client.search.assert_called_once_with(
            query="test query",
            threshold=0.7
        )

    def test_handles_errors_correctly(self, mock_memory_client):
        from memoryos.exceptions import MemoryOSError

        mock_memory_client.memories.create.side_effect = MemoryOSError(
            code="VALIDATION_ERROR",
            message="Content is required"
        )

        with pytest.raises(MemoryOSError) as exc_info:
            mock_memory_client.memories.create(content="")

        assert exc_info.value.code == "VALIDATION_ERROR"

Integration Testing

For integration tests, use a test tenant or test API keys:

JavaScript
// __tests__/integration/memory.integration.test.js
import { MemoryOS } from '@memory-os/sdk';

describe('Memory OS Integration', () => {
  let client;
  const createdMemoryIds = [];

  beforeAll(() => {
    // Use test API key
    client = new MemoryOS({
      apiKey: process.env.MEMORY_OS_TEST_API_KEY,
      baseUrl: process.env.MEMORY_OS_TEST_URL || 'https://api.mymemoryos.com/v1'
    });
  });

  afterAll(async () => {
    // Clean up created memories
    for (const id of createdMemoryIds) {
      try {
        await client.memories.delete(id);
      } catch (e) {
        // Ignore cleanup errors
      }
    }
  });

  test('full memory lifecycle', async () => {
    // Create
    const memory = await client.memories.create({
      content: 'Integration test memory',
      tier: 'short',
      content_type: 'text',
      metadata: { test: true }
    });

    createdMemoryIds.push(memory.id);
    expect(memory.id).toBeDefined();
    expect(memory.content).toBe('Integration test memory');

    // Read
    const fetched = await client.memories.get(memory.id);
    expect(fetched.id).toBe(memory.id);

    // Update
    const updated = await client.memories.update(memory.id, {
      importance_score: 0.8
    });
    expect(updated.importance_score).toBe(0.8);

    // Search
    const results = await client.search({
      query: 'Integration test',
      threshold: 0.5
    });
    expect(results.results.length).toBeGreaterThan(0);

    // Delete
    const deleted = await client.memories.delete(memory.id);
    expect(deleted.deleted).toBe(true);

    // Remove from cleanup list since already deleted
    createdMemoryIds.pop();
  });

  test('context retrieval', async () => {
    // Create test memories
    const memory1 = await client.memories.create({
      content: 'User prefers TypeScript for all projects',
      tier: 'long',
      content_type: 'fact'
    });
    createdMemoryIds.push(memory1.id);

    const memory2 = await client.memories.create({
      content: 'User works at a fintech startup',
      tier: 'long',
      content_type: 'fact'
    });
    createdMemoryIds.push(memory2.id);

    // Wait for embeddings to be processed
    await new Promise(resolve => setTimeout(resolve, 2000));

    // Get context
    const context = await client.getContext({
      query: 'What programming language does the user prefer?',
      max_tokens: 1000
    });

    expect(context.memories.length).toBeGreaterThan(0);
    expect(context.context).toContain('TypeScript');
  });
});
Python
# tests/integration/test_memory_integration.py
import os
import time
import pytest
from memoryos import MemoryOS


class TestMemoryOSIntegration:
    @pytest.fixture(autouse=True)
    def setup(self):
        self.client = MemoryOS(
            api_key=os.environ["MEMORY_OS_TEST_API_KEY"],
            base_url=os.environ.get(
                "MEMORY_OS_TEST_URL",
                "https://api.mymemoryos.com/v1"
            )
        )
        self.created_memory_ids = []
        yield
        # Cleanup
        for memory_id in self.created_memory_ids:
            try:
                self.client.memories.delete(memory_id)
            except Exception:
                pass

    def test_full_memory_lifecycle(self):
        # Create
        memory = self.client.memories.create(
            content="Integration test memory",
            tier="short",
            content_type="text",
            metadata={"test": True}
        )

        self.created_memory_ids.append(memory["id"])
        assert memory["id"] is not None
        assert memory["content"] == "Integration test memory"

        # Read
        fetched = self.client.memories.get(memory["id"])
        assert fetched["id"] == memory["id"]

        # Update
        updated = self.client.memories.update(
            memory["id"],
            importance_score=0.8
        )
        assert updated["importance_score"] == 0.8

        # Search
        results = self.client.search(
            query="Integration test",
            threshold=0.5
        )
        assert len(results["results"]) > 0

        # Delete
        deleted = self.client.memories.delete(memory["id"])
        assert deleted["deleted"] is True
        self.created_memory_ids.remove(memory["id"])

    def test_context_retrieval(self):
        # Create test memories
        memory1 = self.client.memories.create(
            content="User prefers TypeScript for all projects",
            tier="long",
            content_type="fact"
        )
        self.created_memory_ids.append(memory1["id"])

        memory2 = self.client.memories.create(
            content="User works at a fintech startup",
            tier="long",
            content_type="fact"
        )
        self.created_memory_ids.append(memory2["id"])

        # Wait for embeddings
        time.sleep(2)

        # Get context
        context = self.client.get_context(
            query="What programming language does the user prefer?",
            max_tokens=1000
        )

        assert len(context["memories"]) > 0
        assert "TypeScript" in context["context"]

Production Deployment

Environment Configuration

Bash
# .env.production
MEMORY_OS_API_KEY=mos_live_xxxxxxxxxxxx
MEMORY_OS_BASE_URL=https://api.mymemoryos.com/v1

# Optional: Override defaults
MEMORY_OS_TIMEOUT=30000
MEMORY_OS_MAX_RETRIES=3

Health Checks

JavaScript
// health.js
import { MemoryOS } from '@memory-os/sdk';

const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });

async function checkMemoryOSHealth() {
  const startTime = Date.now();

  try {
    // Perform a lightweight operation
    await memory.memories.list({ limit: 1 });

    return {
      status: 'healthy',
      latency_ms: Date.now() - startTime,
      timestamp: new Date().toISOString()
    };
  } catch (error) {
    return {
      status: 'unhealthy',
      error: error.message,
      code: error.code,
      latency_ms: Date.now() - startTime,
      timestamp: new Date().toISOString()
    };
  }
}

// Express health endpoint
app.get('/health/memory-os', async (req, res) => {
  const health = await checkMemoryOSHealth();
  const status = health.status === 'healthy' ? 200 : 503;
  res.status(status).json(health);
});
Python
# health.py
import time
from datetime import datetime
from memoryos import MemoryOS

memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])


def check_memory_os_health():
    start_time = time.time()

    try:
        memory.memories.list(limit=1)

        return {
            "status": "healthy",
            "latency_ms": int((time.time() - start_time) * 1000),
            "timestamp": datetime.utcnow().isoformat()
        }
    except Exception as error:
        return {
            "status": "unhealthy",
            "error": str(error),
            "code": getattr(error, "code", None),
            "latency_ms": int((time.time() - start_time) * 1000),
            "timestamp": datetime.utcnow().isoformat()
        }


# Flask health endpoint
@app.route("/health/memory-os")
def health_check():
    health = check_memory_os_health()
    status = 200 if health["status"] == "healthy" else 503
    return jsonify(health), status

Monitoring and Observability

Track key metrics for production monitoring:

JavaScript
// metrics.js
import { MemoryOS } from '@memory-os/sdk';
import { Counter, Histogram } from 'prom-client';

// Define metrics
const memoryOperations = new Counter({
  name: 'memory_os_operations_total',
  help: 'Total Memory OS operations',
  labelNames: ['operation', 'status']
});

const memoryLatency = new Histogram({
  name: 'memory_os_latency_seconds',
  help: 'Memory OS operation latency',
  labelNames: ['operation'],
  buckets: [0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5]
});

// Instrumented client wrapper
class InstrumentedMemoryClient {
  constructor(apiKey) {
    this.client = new MemoryOS({ apiKey });
  }

  async create(params) {
    const timer = memoryLatency.startTimer({ operation: 'create' });
    try {
      const result = await this.client.memories.create(params);
      memoryOperations.inc({ operation: 'create', status: 'success' });
      return result;
    } catch (error) {
      memoryOperations.inc({ operation: 'create', status: 'error' });
      throw error;
    } finally {
      timer();
    }
  }

  async search(params) {
    const timer = memoryLatency.startTimer({ operation: 'search' });
    try {
      const result = await this.client.search(params);
      memoryOperations.inc({ operation: 'search', status: 'success' });
      return result;
    } catch (error) {
      memoryOperations.inc({ operation: 'search', status: 'error' });
      throw error;
    } finally {
      timer();
    }
  }

  async getContext(params) {
    const timer = memoryLatency.startTimer({ operation: 'context' });
    try {
      const result = await this.client.getContext(params);
      memoryOperations.inc({ operation: 'context', status: 'success' });
      return result;
    } catch (error) {
      memoryOperations.inc({ operation: 'context', status: 'error' });
      throw error;
    } finally {
      timer();
    }
  }
}

export const memory = new InstrumentedMemoryClient(process.env.MEMORY_OS_API_KEY);

Ctrl+Shift+C to copy