Migration Guide
This guide covers migrating your existing memory or embedding storage systems to Memory OS. Whether you're coming from a vector database, key-value store, custom solution, or LLM framework, you'll find step-by-step instructions and migration scripts here.
Why Migrate to Memory OS?
Memory OS provides advantages over traditional approaches:
| Aspect | Traditional Approach | Memory OS |
|---|---|---|
| Memory tiers | Single storage layer | Short, medium, and long-term tiers with automatic lifecycle |
| Semantic understanding | Raw vector similarity | Cognitive memory model with nature classification |
| Context retrieval | Manual token management | Intelligent context window optimization |
| Embedding management | Manual embedding generation | Automatic embedding with best-in-class models |
| Memory relationships | Flat structure | Hierarchical parent-child relationships |
| Importance decay | No native support | Automatic importance scoring and decay |
Migration Paths
From Pinecone / Vector Databases
Pinecone and similar vector databases store embeddings with metadata. Memory OS can import these directly or re-embed your content for improved performance.
Data Mapping
| Pinecone Field | Memory OS Field | Notes |
|---|---|---|
id | metadata.original_id | Preserve for reference |
values | Auto-generated | Re-embed or provide via embedding |
metadata.text | content | The actual text content |
metadata.namespace | metadata.namespace | Preserve namespace organization |
metadata.* | metadata.* | Direct mapping |
score | N/A | Computed at query time |
Migration Script
import { Pinecone } from '@pinecone-database/pinecone';
import { MemoryOS } from '@memory-os/sdk';
const pinecone = new Pinecone({ apiKey: process.env.PINECONE_API_KEY });
const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
async function migrateFromPinecone(options = {}) {
const {
indexName,
namespace = '',
batchSize = 100,
reEmbed = true, // Recommended: let Memory OS generate better embeddings
defaultTier = 'long'
} = options;
const index = pinecone.index(indexName);
// Get all vector IDs (Pinecone requires listing in batches)
let paginationToken;
let totalMigrated = 0;
do {
// List vectors
const listResponse = await index.namespace(namespace).listPaginated({
limit: batchSize,
paginationToken
});
if (listResponse.vectors?.length === 0) break;
// Fetch full vector data
const ids = listResponse.vectors.map(v => v.id);
const fetchResponse = await index.namespace(namespace).fetch(ids);
// Prepare batch for Memory OS
const memories = Object.entries(fetchResponse.records).map(([id, record]) => ({
content: record.metadata?.text || record.metadata?.content || '',
tier: mapToTier(record.metadata),
content_type: record.metadata?.type || 'text',
memory_nature: record.metadata?.nature || 'semantic',
importance_score: record.metadata?.importance,
// Only include embedding if not re-embedding
...(reEmbed ? {} : { embedding: record.values }),
metadata: {
...record.metadata,
original_id: id,
source: 'pinecone',
namespace: namespace,
migrated_at: new Date().toISOString()
}
}));
// Import to Memory OS
for (const mem of memories) {
if (!mem.content) {
console.warn(`Skipping empty content for ${mem.metadata.original_id}`);
continue;
}
try {
await memory.memories.create(mem);
totalMigrated++;
} catch (error) {
console.error(`Failed to migrate ${mem.metadata.original_id}:`, error.message);
}
}
console.log(`Migrated ${totalMigrated} memories...`);
paginationToken = listResponse.pagination?.next;
// Rate limiting
await delay(100);
} while (paginationToken);
return { totalMigrated };
}
function mapToTier(metadata) {
// Map based on your existing categorization
if (metadata?.permanent || metadata?.tier === 'permanent') return 'long';
if (metadata?.temporary || metadata?.tier === 'temp') return 'short';
return 'medium';
}
function delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Usage
const result = await migrateFromPinecone({
indexName: 'my-embeddings',
namespace: 'production',
batchSize: 50,
reEmbed: true
});
console.log(`Migration complete: ${result.totalMigrated} memories`);import os
import time
from typing import Optional, Dict, Any
from pinecone import Pinecone
from memoryos import MemoryOS
pinecone_client = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])
def migrate_from_pinecone(
index_name: str,
namespace: str = "",
batch_size: int = 100,
re_embed: bool = True,
default_tier: str = "long"
) -> Dict[str, Any]:
index = pinecone_client.Index(index_name)
pagination_token = None
total_migrated = 0
while True:
# List vectors
list_response = index.list_paginated(
namespace=namespace,
limit=batch_size,
pagination_token=pagination_token
)
if not list_response.vectors:
break
# Fetch full vector data
ids = [v.id for v in list_response.vectors]
fetch_response = index.fetch(ids=ids, namespace=namespace)
# Prepare and import to Memory OS
for id, record in fetch_response.vectors.items():
metadata = record.metadata or {}
content = metadata.get("text") or metadata.get("content", "")
if not content:
print(f"Skipping empty content for {id}")
continue
memory_data = {
"content": content,
"tier": map_to_tier(metadata),
"content_type": metadata.get("type", "text"),
"memory_nature": metadata.get("nature", "semantic"),
"metadata": {
**metadata,
"original_id": id,
"source": "pinecone",
"namespace": namespace,
"migrated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ")
}
}
if metadata.get("importance"):
memory_data["importance_score"] = metadata["importance"]
# Include embedding if not re-embedding
if not re_embed:
memory_data["embedding"] = record.values
try:
memory.memories.create(**memory_data)
total_migrated += 1
except Exception as e:
print(f"Failed to migrate {id}: {e}")
print(f"Migrated {total_migrated} memories...")
pagination_token = list_response.pagination.next if list_response.pagination else None
if not pagination_token:
break
# Rate limiting
time.sleep(0.1)
return {"total_migrated": total_migrated}
def map_to_tier(metadata: Dict) -> str:
if metadata.get("permanent") or metadata.get("tier") == "permanent":
return "long"
if metadata.get("temporary") or metadata.get("tier") == "temp":
return "short"
return "medium"
# Usage
result = migrate_from_pinecone(
index_name="my-embeddings",
namespace="production",
batch_size=50,
re_embed=True
)
print(f"Migration complete: {result['total_migrated']} memories")From Redis / Key-Value Stores
Redis-based memory stores typically hold conversation history or cached context. Memory OS provides semantic search and intelligent retrieval on top of this data.
Data Mapping
| Redis Pattern | Memory OS Field | Notes |
|---|---|---|
user:{id}:messages | content | Conversation turns |
user:{id}:context | content with tier: short | Session context |
user:{id}:preferences | content with tier: long | User preferences |
| Key TTL | tier | Map TTL to appropriate tier |
Migration Script
import Redis from 'ioredis';
import { MemoryOS } from '@memory-os/sdk';
const redis = new Redis(process.env.REDIS_URL);
const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
async function migrateFromRedis(options = {}) {
const {
keyPattern = 'memory:*',
batchSize = 100,
preserveTimestamps = true
} = options;
let cursor = '0';
let totalMigrated = 0;
do {
// Scan for keys matching pattern
const [nextCursor, keys] = await redis.scan(
cursor,
'MATCH', keyPattern,
'COUNT', batchSize
);
cursor = nextCursor;
for (const key of keys) {
try {
const data = await getRedisData(key);
if (!data) continue;
// Get TTL to determine tier
const ttl = await redis.ttl(key);
const tier = ttlToTier(ttl);
// Parse user ID from key pattern
const userId = extractUserId(key);
// Create memory
await memory.memories.create({
content: data.content || JSON.stringify(data),
tier,
content_type: determineContentType(key, data),
memory_nature: determineNature(key, data),
metadata: {
original_key: key,
source: 'redis',
user_id: userId,
...(preserveTimestamps && data.timestamp
? { original_timestamp: data.timestamp }
: {}),
migrated_at: new Date().toISOString()
}
});
totalMigrated++;
} catch (error) {
console.error(`Failed to migrate ${key}:`, error.message);
}
}
console.log(`Progress: ${totalMigrated} migrated, cursor: ${cursor}`);
await delay(50);
} while (cursor !== '0');
return { totalMigrated };
}
async function getRedisData(key) {
const type = await redis.type(key);
switch (type) {
case 'string':
const str = await redis.get(key);
try {
return JSON.parse(str);
} catch {
return { content: str };
}
case 'hash':
return redis.hgetall(key);
case 'list':
const items = await redis.lrange(key, 0, -1);
return { content: items.join('\n'), items };
case 'set':
const members = await redis.smembers(key);
return { content: members.join(', '), members };
default:
return null;
}
}
function ttlToTier(ttl) {
if (ttl === -1) return 'long'; // No expiry
if (ttl < 3600) return 'short'; // Less than 1 hour
if (ttl < 86400 * 7) return 'medium'; // Less than 1 week
return 'long';
}
function extractUserId(key) {
// Adjust regex based on your key pattern
const match = key.match(/user:([^:]+)/);
return match ? match[1] : null;
}
function determineContentType(key, data) {
if (key.includes('conversation') || key.includes('message')) return 'conversation';
if (key.includes('preference')) return 'fact';
if (key.includes('document')) return 'document';
return 'text';
}
function determineNature(key, data) {
if (key.includes('fact') || key.includes('preference')) return 'semantic';
if (key.includes('event') || key.includes('action')) return 'procedural';
return 'episodic';
}
function delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Usage
const result = await migrateFromRedis({
keyPattern: 'user:*:memory:*',
batchSize: 100
});
console.log(`Migration complete: ${result.totalMigrated} memories`);import os
import time
import json
import re
from typing import Optional, Dict, Any
import redis
from memoryos import MemoryOS
redis_client = redis.from_url(os.environ["REDIS_URL"])
memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])
def migrate_from_redis(
key_pattern: str = "memory:*",
batch_size: int = 100,
preserve_timestamps: bool = True
) -> Dict[str, Any]:
cursor = 0
total_migrated = 0
while True:
cursor, keys = redis_client.scan(
cursor=cursor,
match=key_pattern,
count=batch_size
)
for key in keys:
key_str = key.decode() if isinstance(key, bytes) else key
try:
data = get_redis_data(key_str)
if not data:
continue
# Get TTL to determine tier
ttl = redis_client.ttl(key_str)
tier = ttl_to_tier(ttl)
# Parse user ID from key pattern
user_id = extract_user_id(key_str)
# Prepare content
content = data.get("content") or json.dumps(data)
# Build metadata
metadata = {
"original_key": key_str,
"source": "redis",
"migrated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ")
}
if user_id:
metadata["user_id"] = user_id
if preserve_timestamps and data.get("timestamp"):
metadata["original_timestamp"] = data["timestamp"]
# Create memory
memory.memories.create(
content=content,
tier=tier,
content_type=determine_content_type(key_str, data),
memory_nature=determine_nature(key_str, data),
metadata=metadata
)
total_migrated += 1
except Exception as e:
print(f"Failed to migrate {key_str}: {e}")
print(f"Progress: {total_migrated} migrated, cursor: {cursor}")
if cursor == 0:
break
time.sleep(0.05)
return {"total_migrated": total_migrated}
def get_redis_data(key: str) -> Optional[Dict]:
key_type = redis_client.type(key).decode()
if key_type == "string":
value = redis_client.get(key)
if value:
value = value.decode() if isinstance(value, bytes) else value
try:
return json.loads(value)
except json.JSONDecodeError:
return {"content": value}
elif key_type == "hash":
data = redis_client.hgetall(key)
return {
k.decode() if isinstance(k, bytes) else k:
v.decode() if isinstance(v, bytes) else v
for k, v in data.items()
}
elif key_type == "list":
items = redis_client.lrange(key, 0, -1)
items = [i.decode() if isinstance(i, bytes) else i for i in items]
return {"content": "\n".join(items), "items": items}
elif key_type == "set":
members = redis_client.smembers(key)
members = [m.decode() if isinstance(m, bytes) else m for m in members]
return {"content": ", ".join(members), "members": list(members)}
return None
def ttl_to_tier(ttl: int) -> str:
if ttl == -1:
return "long" # No expiry
if ttl < 3600:
return "short" # Less than 1 hour
if ttl < 86400 * 7:
return "medium" # Less than 1 week
return "long"
def extract_user_id(key: str) -> Optional[str]:
match = re.search(r"user:([^:]+)", key)
return match.group(1) if match else None
def determine_content_type(key: str, data: Dict) -> str:
if "conversation" in key or "message" in key:
return "conversation"
if "preference" in key:
return "fact"
if "document" in key:
return "document"
return "text"
def determine_nature(key: str, data: Dict) -> str:
if "fact" in key or "preference" in key:
return "semantic"
if "event" in key or "action" in key:
return "procedural"
return "episodic"
# Usage
result = migrate_from_redis(
key_pattern="user:*:memory:*",
batch_size=100
)
print(f"Migration complete: {result['total_migrated']} memories")From Custom PostgreSQL Solutions
If you're using PostgreSQL with pgvector or a custom schema for embeddings, Memory OS can take over embedding generation and retrieval.
Data Mapping
| PostgreSQL Column | Memory OS Field | Notes |
|---|---|---|
id | metadata.original_id | Preserve for reference |
content / text | content | Main content |
embedding | Auto-generated | Re-embed recommended |
created_at | metadata.original_created_at | Preserve timestamps |
user_id | metadata.user_id | User association |
category | content_type or metadata | Map to content types |
Migration Script
import pg from 'pg';
import { MemoryOS } from '@memory-os/sdk';
const pool = new pg.Pool({
connectionString: process.env.DATABASE_URL
});
const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
async function migrateFromPostgres(options = {}) {
const {
tableName = 'embeddings',
contentColumn = 'content',
batchSize = 100,
reEmbed = true,
whereClause = ''
} = options;
let offset = 0;
let totalMigrated = 0;
while (true) {
const query = `
SELECT * FROM ${tableName}
${whereClause ? `WHERE ${whereClause}` : ''}
ORDER BY created_at ASC
LIMIT $1 OFFSET $2
`;
const result = await pool.query(query, [batchSize, offset]);
if (result.rows.length === 0) break;
for (const row of result.rows) {
try {
const memoryData = {
content: row[contentColumn] || row.text || row.content,
tier: mapToTier(row),
content_type: mapContentType(row),
memory_nature: row.memory_nature || 'semantic',
metadata: {
original_id: row.id,
source: 'postgresql',
original_table: tableName,
original_created_at: row.created_at?.toISOString(),
...(row.user_id && { user_id: row.user_id }),
...(row.metadata && typeof row.metadata === 'object' ? row.metadata : {}),
migrated_at: new Date().toISOString()
}
};
// Include importance if present
if (row.importance !== undefined) {
memoryData.importance_score = row.importance;
}
// Include embedding if not re-embedding
if (!reEmbed && row.embedding) {
// Handle pgvector format
memoryData.embedding = Array.isArray(row.embedding)
? row.embedding
: JSON.parse(row.embedding);
}
await memory.memories.create(memoryData);
totalMigrated++;
} catch (error) {
console.error(`Failed to migrate row ${row.id}:`, error.message);
}
}
offset += batchSize;
console.log(`Migrated ${totalMigrated} records (offset: ${offset})`);
// Rate limiting
await delay(100);
}
return { totalMigrated };
}
function mapToTier(row) {
if (row.tier) return row.tier;
if (row.permanent) return 'long';
if (row.ttl && row.ttl < 86400) return 'short';
if (row.category === 'preference' || row.category === 'fact') return 'long';
return 'medium';
}
function mapContentType(row) {
const typeMap = {
'chat': 'conversation',
'conversation': 'conversation',
'document': 'document',
'code': 'code',
'preference': 'fact',
'fact': 'fact'
};
return typeMap[row.category] || typeMap[row.type] || 'text';
}
function delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Usage
const result = await migrateFromPostgres({
tableName: 'user_embeddings',
contentColumn: 'text',
batchSize: 100,
reEmbed: true,
whereClause: "status = 'active'"
});
console.log(`Migration complete: ${result.totalMigrated} records`);
await pool.end();import os
import time
import json
from typing import Optional, Dict, Any, List
import psycopg2
from psycopg2.extras import RealDictCursor
from memoryos import MemoryOS
memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])
def migrate_from_postgres(
table_name: str = "embeddings",
content_column: str = "content",
batch_size: int = 100,
re_embed: bool = True,
where_clause: str = ""
) -> Dict[str, Any]:
conn = psycopg2.connect(os.environ["DATABASE_URL"])
cursor = conn.cursor(cursor_factory=RealDictCursor)
offset = 0
total_migrated = 0
while True:
query = f"""
SELECT * FROM {table_name}
{f'WHERE {where_clause}' if where_clause else ''}
ORDER BY created_at ASC
LIMIT %s OFFSET %s
"""
cursor.execute(query, (batch_size, offset))
rows = cursor.fetchall()
if not rows:
break
for row in rows:
try:
content = row.get(content_column) or row.get("text") or row.get("content")
if not content:
print(f"Skipping row {row.get('id')}: no content")
continue
# Build metadata
metadata = {
"original_id": str(row["id"]),
"source": "postgresql",
"original_table": table_name,
"migrated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ")
}
if row.get("created_at"):
metadata["original_created_at"] = row["created_at"].isoformat()
if row.get("user_id"):
metadata["user_id"] = str(row["user_id"])
# Merge existing metadata if present
if row.get("metadata") and isinstance(row["metadata"], dict):
metadata.update(row["metadata"])
memory_data = {
"content": content,
"tier": map_to_tier(row),
"content_type": map_content_type(row),
"memory_nature": row.get("memory_nature", "semantic"),
"metadata": metadata
}
# Include importance if present
if row.get("importance") is not None:
memory_data["importance_score"] = float(row["importance"])
# Include embedding if not re-embedding
if not re_embed and row.get("embedding"):
embedding = row["embedding"]
if isinstance(embedding, str):
embedding = json.loads(embedding)
memory_data["embedding"] = embedding
memory.memories.create(**memory_data)
total_migrated += 1
except Exception as e:
print(f"Failed to migrate row {row.get('id')}: {e}")
offset += batch_size
print(f"Migrated {total_migrated} records (offset: {offset})")
time.sleep(0.1)
cursor.close()
conn.close()
return {"total_migrated": total_migrated}
def map_to_tier(row: Dict) -> str:
if row.get("tier"):
return row["tier"]
if row.get("permanent"):
return "long"
if row.get("ttl") and row["ttl"] < 86400:
return "short"
if row.get("category") in ["preference", "fact"]:
return "long"
return "medium"
def map_content_type(row: Dict) -> str:
type_map = {
"chat": "conversation",
"conversation": "conversation",
"document": "document",
"code": "code",
"preference": "fact",
"fact": "fact"
}
return (
type_map.get(row.get("category")) or
type_map.get(row.get("type")) or
"text"
)
# Usage
result = migrate_from_postgres(
table_name="user_embeddings",
content_column="text",
batch_size=100,
re_embed=True,
where_clause="status = 'active'"
)
print(f"Migration complete: {result['total_migrated']} records")From LangChain Memory
LangChain provides various memory classes. Here's how to migrate from common ones.
ConversationBufferMemory
import { MemoryOS } from '@memory-os/sdk';
const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
async function migrateFromLangChainBuffer(langChainMemory, userId) {
// Get all messages from LangChain memory
const chatHistory = await langChainMemory.loadMemoryVariables({});
const messages = langChainMemory.chatHistory?.messages || [];
let migrated = 0;
// Store each conversation turn
for (let i = 0; i < messages.length; i += 2) {
const humanMessage = messages[i];
const aiMessage = messages[i + 1];
if (!humanMessage || !aiMessage) continue;
await memory.memories.create({
content: `User: ${humanMessage.content}\nAssistant: ${aiMessage.content}`,
tier: 'short',
content_type: 'conversation',
memory_nature: 'episodic',
metadata: {
user_id: userId,
source: 'langchain_buffer',
turn_index: i / 2,
migrated_at: new Date().toISOString()
}
});
migrated++;
}
return { migrated };
}
// Usage with existing LangChain memory
// const result = await migrateFromLangChainBuffer(myLangChainMemory, 'user_123');import time
from typing import Dict, Any
from memoryos import MemoryOS
# from langchain.memory import ConversationBufferMemory
memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])
def migrate_from_langchain_buffer(langchain_memory, user_id: str) -> Dict[str, Any]:
# Get all messages from LangChain memory
messages = langchain_memory.chat_memory.messages
migrated = 0
# Store each conversation turn
for i in range(0, len(messages), 2):
human_message = messages[i] if i < len(messages) else None
ai_message = messages[i + 1] if i + 1 < len(messages) else None
if not human_message or not ai_message:
continue
memory.memories.create(
content=f"User: {human_message.content}\nAssistant: {ai_message.content}",
tier="short",
content_type="conversation",
memory_nature="episodic",
metadata={
"user_id": user_id,
"source": "langchain_buffer",
"turn_index": i // 2,
"migrated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ")
}
)
migrated += 1
return {"migrated": migrated}
# Usage with existing LangChain memory
# result = migrate_from_langchain_buffer(my_langchain_memory, "user_123")ConversationSummaryMemory
import { MemoryOS } from '@memory-os/sdk';
const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
async function migrateFromLangChainSummary(langChainMemory, userId) {
// Get the summary
const memoryVars = await langChainMemory.loadMemoryVariables({});
const summary = memoryVars.history || memoryVars.summary || '';
if (!summary) {
return { migrated: 0, message: 'No summary found' };
}
// Store summary as a medium-term memory
await memory.memories.create({
content: `Conversation summary: ${summary}`,
tier: 'medium',
content_type: 'document',
memory_nature: 'semantic',
importance_score: 0.7,
metadata: {
user_id: userId,
source: 'langchain_summary',
type: 'conversation_summary',
migrated_at: new Date().toISOString()
}
});
return { migrated: 1 };
}def migrate_from_langchain_summary(langchain_memory, user_id: str) -> Dict[str, Any]:
# Get the summary
memory_vars = langchain_memory.load_memory_variables({})
summary = memory_vars.get("history") or memory_vars.get("summary", "")
if not summary:
return {"migrated": 0, "message": "No summary found"}
# Store summary as a medium-term memory
memory.memories.create(
content=f"Conversation summary: {summary}",
tier="medium",
content_type="document",
memory_nature="semantic",
importance_score=0.7,
metadata={
"user_id": user_id,
"source": "langchain_summary",
"type": "conversation_summary",
"migrated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ")
}
)
return {"migrated": 1}VectorStoreRetrieverMemory
import { MemoryOS } from '@memory-os/sdk';
const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
async function migrateFromLangChainVectorStore(vectorStore, userId, options = {}) {
const { batchSize = 100, reEmbed = true } = options;
// This approach varies based on the vector store type
// Example for FAISS or Chroma
let totalMigrated = 0;
// Get all documents (implementation varies by vector store)
// For Chroma:
const collection = await vectorStore._collection.get();
for (let i = 0; i < collection.documents.length; i++) {
const doc = collection.documents[i];
const metadata = collection.metadatas[i] || {};
const embedding = collection.embeddings?.[i];
const memoryData = {
content: doc,
tier: 'medium',
content_type: 'text',
memory_nature: 'episodic',
metadata: {
...metadata,
user_id: userId,
source: 'langchain_vectorstore',
original_id: collection.ids[i],
migrated_at: new Date().toISOString()
}
};
if (!reEmbed && embedding) {
memoryData.embedding = embedding;
}
await memory.memories.create(memoryData);
totalMigrated++;
if (totalMigrated % batchSize === 0) {
console.log(`Migrated ${totalMigrated} documents...`);
await delay(100);
}
}
return { totalMigrated };
}
function delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}def migrate_from_langchain_vectorstore(
vector_store,
user_id: str,
batch_size: int = 100,
re_embed: bool = True
) -> Dict[str, Any]:
total_migrated = 0
# This approach varies based on the vector store type
# Example for Chroma:
collection = vector_store._collection.get()
documents = collection.get("documents", [])
metadatas = collection.get("metadatas", [])
ids = collection.get("ids", [])
embeddings = collection.get("embeddings", [])
for i, doc in enumerate(documents):
metadata = metadatas[i] if i < len(metadatas) else {}
embedding = embeddings[i] if i < len(embeddings) else None
memory_data = {
"content": doc,
"tier": "medium",
"content_type": "text",
"memory_nature": "episodic",
"metadata": {
**metadata,
"user_id": user_id,
"source": "langchain_vectorstore",
"original_id": ids[i] if i < len(ids) else None,
"migrated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ")
}
}
if not re_embed and embedding:
memory_data["embedding"] = embedding
memory.memories.create(**memory_data)
total_migrated += 1
if total_migrated % batch_size == 0:
print(f"Migrated {total_migrated} documents...")
time.sleep(0.1)
return {"total_migrated": total_migrated}From Raw OpenAI Embeddings Storage
If you've been storing OpenAI embeddings directly, here's how to migrate.
Migration Script
import { MemoryOS } from '@memory-os/sdk';
import fs from 'fs/promises';
const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
async function migrateFromOpenAIEmbeddings(options = {}) {
const {
dataSource, // File path, database cursor, or array
batchSize = 50,
reuseEmbeddings = false // OpenAI embeddings are compatible
} = options;
let data;
// Load data from source
if (typeof dataSource === 'string') {
const content = await fs.readFile(dataSource, 'utf-8');
data = JSON.parse(content);
} else if (Array.isArray(dataSource)) {
data = dataSource;
} else {
throw new Error('dataSource must be a file path or array');
}
let totalMigrated = 0;
for (let i = 0; i < data.length; i += batchSize) {
const batch = data.slice(i, i + batchSize);
for (const item of batch) {
try {
const memoryData = {
content: item.text || item.content || item.input,
tier: determineTier(item),
content_type: item.type || 'text',
memory_nature: item.nature || 'semantic',
metadata: {
source: 'openai_embeddings',
model: item.model || 'text-embedding-ada-002',
...(item.metadata || {}),
original_id: item.id,
migrated_at: new Date().toISOString()
}
};
// Reuse OpenAI embeddings if they match the expected dimension
if (reuseEmbeddings && item.embedding) {
// OpenAI ada-002 produces 1536-dimensional vectors
// Verify dimension matches expected
if (Array.isArray(item.embedding) && item.embedding.length === 1536) {
memoryData.embedding = item.embedding;
}
}
await memory.memories.create(memoryData);
totalMigrated++;
} catch (error) {
console.error(`Failed to migrate item ${item.id}:`, error.message);
}
}
console.log(`Progress: ${totalMigrated}/${data.length}`);
await delay(100);
}
return { totalMigrated, total: data.length };
}
function determineTier(item) {
if (item.tier) return item.tier;
if (item.permanent || item.important) return 'long';
if (item.temporary) return 'short';
return 'medium';
}
function delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Usage
const result = await migrateFromOpenAIEmbeddings({
dataSource: './embeddings_export.json',
batchSize: 50,
reuseEmbeddings: true
});
console.log(`Migration complete: ${result.totalMigrated}/${result.total}`);import os
import time
import json
from typing import Dict, Any, Union, List
from memoryos import MemoryOS
memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])
def migrate_from_openai_embeddings(
data_source: Union[str, List[Dict]],
batch_size: int = 50,
reuse_embeddings: bool = False
) -> Dict[str, Any]:
# Load data from source
if isinstance(data_source, str):
with open(data_source, "r") as f:
data = json.load(f)
elif isinstance(data_source, list):
data = data_source
else:
raise ValueError("data_source must be a file path or list")
total_migrated = 0
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
for item in batch:
try:
content = item.get("text") or item.get("content") or item.get("input")
if not content:
print(f"Skipping item {item.get('id')}: no content")
continue
memory_data = {
"content": content,
"tier": determine_tier(item),
"content_type": item.get("type", "text"),
"memory_nature": item.get("nature", "semantic"),
"metadata": {
"source": "openai_embeddings",
"model": item.get("model", "text-embedding-ada-002"),
**item.get("metadata", {}),
"original_id": item.get("id"),
"migrated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ")
}
}
# Reuse OpenAI embeddings if they match expected dimension
if reuse_embeddings and item.get("embedding"):
embedding = item["embedding"]
# OpenAI ada-002 produces 1536-dimensional vectors
if isinstance(embedding, list) and len(embedding) == 1536:
memory_data["embedding"] = embedding
memory.memories.create(**memory_data)
total_migrated += 1
except Exception as e:
print(f"Failed to migrate item {item.get('id')}: {e}")
print(f"Progress: {total_migrated}/{len(data)}")
time.sleep(0.1)
return {"total_migrated": total_migrated, "total": len(data)}
def determine_tier(item: Dict) -> str:
if item.get("tier"):
return item["tier"]
if item.get("permanent") or item.get("important"):
return "long"
if item.get("temporary"):
return "short"
return "medium"
# Usage
result = migrate_from_openai_embeddings(
data_source="./embeddings_export.json",
batch_size=50,
reuse_embeddings=True
)
print(f"Migration complete: {result['total_migrated']}/{result['total']}")Data Import Best Practices
Batch Sizing
Choose batch sizes based on your data characteristics:
| Scenario | Recommended Batch Size | Reason |
|---|---|---|
| Small text memories | 100-200 | Low overhead |
| Large documents | 20-50 | Larger content takes longer to process |
| Re-embedding | 25-50 | Embedding generation is CPU-intensive |
| Reusing embeddings | 100-200 | Faster processing without embedding |
Rate Limiting
Memory OS applies rate limits to ensure system stability. Handle rate limits gracefully:
async function importWithRateLimiting(items, importFn) {
const results = { success: 0, failed: 0, rateLimited: 0 };
for (const item of items) {
let retries = 0;
const maxRetries = 3;
while (retries < maxRetries) {
try {
await importFn(item);
results.success++;
break;
} catch (error) {
if (error.code === 'RATE_LIMIT_EXCEEDED') {
results.rateLimited++;
const waitTime = error.retryAfter || Math.pow(2, retries) * 1000;
console.log(`Rate limited, waiting ${waitTime}ms...`);
await delay(waitTime);
retries++;
} else {
results.failed++;
console.error(`Failed to import:`, error.message);
break;
}
}
}
// Small delay between items to avoid hitting limits
await delay(50);
}
return results;
}import time
def import_with_rate_limiting(items, import_fn):
results = {"success": 0, "failed": 0, "rate_limited": 0}
for item in items:
retries = 0
max_retries = 3
while retries < max_retries:
try:
import_fn(item)
results["success"] += 1
break
except Exception as error:
if getattr(error, "code", None) == "RATE_LIMIT_EXCEEDED":
results["rate_limited"] += 1
wait_time = getattr(error, "retry_after", 2 ** retries)
print(f"Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
retries += 1
else:
results["failed"] += 1
print(f"Failed to import: {error}")
break
# Small delay between items
time.sleep(0.05)
return resultsPreserving Timestamps
Maintain temporal context by preserving original timestamps:
// Store original timestamp in metadata
await memory.memories.create({
content: record.content,
tier: 'medium',
metadata: {
original_created_at: record.created_at.toISOString(),
original_updated_at: record.updated_at?.toISOString(),
migrated_at: new Date().toISOString()
}
});Mapping Metadata to Tags and Entities
Convert your existing categorization to Memory OS structure:
function convertMetadata(oldMetadata) {
return {
// Keep as metadata
user_id: oldMetadata.userId,
session_id: oldMetadata.sessionId,
// Convert categories to tags pattern
tags: [
oldMetadata.category,
oldMetadata.type,
...(oldMetadata.tags || [])
].filter(Boolean),
// Entity extraction from content
entities: oldMetadata.entities || [],
// Preserve source info
source: 'migration',
original_source: oldMetadata.source
};
}Handling Existing Embeddings
Re-embedding vs Reusing
| Approach | Pros | Cons |
|---|---|---|
| Re-embed | Best quality, consistent model, optimal for Memory OS search | Slower migration, higher cost |
| Reuse existing | Faster, no embedding cost | May be incompatible, potentially lower search quality |
Checking Embedding Compatibility
Memory OS uses specific embedding models. To reuse embeddings, they must be compatible:
function checkEmbeddingCompatibility(embedding) {
// Memory OS expects specific dimensions
const EXPECTED_DIMENSIONS = 1536; // Adjust based on Memory OS model
if (!Array.isArray(embedding)) {
return { compatible: false, reason: 'Not an array' };
}
if (embedding.length !== EXPECTED_DIMENSIONS) {
return {
compatible: false,
reason: `Dimension mismatch: ${embedding.length} vs ${EXPECTED_DIMENSIONS}`
};
}
// Check for valid numbers
const hasInvalidValues = embedding.some(v =>
typeof v !== 'number' || isNaN(v) || !isFinite(v)
);
if (hasInvalidValues) {
return { compatible: false, reason: 'Contains invalid values' };
}
return { compatible: true };
}Hybrid Approach
For large migrations, consider a hybrid approach:
async function hybridMigration(items) {
const results = { reused: 0, reEmbedded: 0 };
for (const item of items) {
const compatibility = checkEmbeddingCompatibility(item.embedding);
const memoryData = {
content: item.content,
tier: item.tier || 'medium',
content_type: item.type || 'text',
memory_nature: 'semantic',
metadata: item.metadata
};
if (compatibility.compatible) {
// Reuse the existing embedding
memoryData.embedding = item.embedding;
results.reused++;
} else {
// Let Memory OS generate new embedding
results.reEmbedded++;
}
await memory.memories.create(memoryData);
}
return results;
}Validation and Testing
Verifying Migration Success
Create a validation script to ensure data integrity:
import { MemoryOS } from '@memory-os/sdk';
const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
async function validateMigration(options = {}) {
const {
expectedCount,
sampleSize = 10,
sourceData // Original data for comparison
} = options;
const validation = {
passed: true,
checks: []
};
// 1. Count check
const listResult = await memory.memories.list({ limit: 1 });
const actualCount = listResult.total;
validation.checks.push({
name: 'Count check',
expected: expectedCount,
actual: actualCount,
passed: Math.abs(actualCount - expectedCount) < expectedCount * 0.01 // 1% tolerance
});
// 2. Sample content check
if (sourceData && sourceData.length > 0) {
const sampleIndices = [];
for (let i = 0; i < Math.min(sampleSize, sourceData.length); i++) {
sampleIndices.push(Math.floor(Math.random() * sourceData.length));
}
let matchCount = 0;
for (const idx of sampleIndices) {
const original = sourceData[idx];
const searchResult = await memory.search({
query: original.content.substring(0, 100),
threshold: 0.9,
limit: 1
});
if (searchResult.results.length > 0) {
matchCount++;
}
}
validation.checks.push({
name: 'Sample content verification',
expected: sampleSize,
actual: matchCount,
passed: matchCount >= sampleSize * 0.9 // 90% should match
});
}
// 3. Search functionality check
const testQuery = "test search after migration";
try {
const searchResult = await memory.search({
query: testQuery,
limit: 5
});
validation.checks.push({
name: 'Search functionality',
passed: true,
message: `Search returned ${searchResult.results.length} results`
});
} catch (error) {
validation.checks.push({
name: 'Search functionality',
passed: false,
message: error.message
});
}
// 4. Context retrieval check
try {
const context = await memory.getContext({
query: "migration validation test",
max_tokens: 500
});
validation.checks.push({
name: 'Context retrieval',
passed: true,
message: `Context retrieved with ${context.memories.length} memories`
});
} catch (error) {
validation.checks.push({
name: 'Context retrieval',
passed: false,
message: error.message
});
}
// Overall status
validation.passed = validation.checks.every(c => c.passed);
return validation;
}
// Usage
const result = await validateMigration({
expectedCount: 10000,
sampleSize: 20,
sourceData: originalData
});
console.log('Validation result:', JSON.stringify(result, null, 2));
if (!result.passed) {
console.error('Migration validation failed!');
process.exit(1);
}import random
from typing import Dict, Any, List, Optional
from memoryos import MemoryOS
memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])
def validate_migration(
expected_count: int,
sample_size: int = 10,
source_data: Optional[List[Dict]] = None
) -> Dict[str, Any]:
validation = {
"passed": True,
"checks": []
}
# 1. Count check
list_result = memory.memories.list(limit=1)
actual_count = list_result.get("total", 0)
count_passed = abs(actual_count - expected_count) < expected_count * 0.01
validation["checks"].append({
"name": "Count check",
"expected": expected_count,
"actual": actual_count,
"passed": count_passed
})
# 2. Sample content check
if source_data and len(source_data) > 0:
sample_indices = random.sample(
range(len(source_data)),
min(sample_size, len(source_data))
)
match_count = 0
for idx in sample_indices:
original = source_data[idx]
content = original.get("content", "")[:100]
search_result = memory.search(
query=content,
threshold=0.9,
limit=1
)
if search_result.get("results"):
match_count += 1
sample_passed = match_count >= sample_size * 0.9
validation["checks"].append({
"name": "Sample content verification",
"expected": sample_size,
"actual": match_count,
"passed": sample_passed
})
# 3. Search functionality check
try:
search_result = memory.search(
query="test search after migration",
limit=5
)
validation["checks"].append({
"name": "Search functionality",
"passed": True,
"message": f"Search returned {len(search_result.get('results', []))} results"
})
except Exception as e:
validation["checks"].append({
"name": "Search functionality",
"passed": False,
"message": str(e)
})
# 4. Context retrieval check
try:
context = memory.get_context(
query="migration validation test",
max_tokens=500
)
validation["checks"].append({
"name": "Context retrieval",
"passed": True,
"message": f"Context retrieved with {len(context.get('memories', []))} memories"
})
except Exception as e:
validation["checks"].append({
"name": "Context retrieval",
"passed": False,
"message": str(e)
})
# Overall status
validation["passed"] = all(c["passed"] for c in validation["checks"])
return validation
# Usage
result = validate_migration(
expected_count=10000,
sample_size=20,
source_data=original_data
)
print(f"Validation result: {json.dumps(result, indent=2)}")
if not result["passed"]:
print("Migration validation failed!")
exit(1)Comparing Search Results
Verify that search quality is maintained or improved:
async function compareSearchResults(testQueries, oldSearchFn, options = {}) {
const { similarityThreshold = 0.7 } = options;
const comparison = {
queries: [],
summary: {
improved: 0,
similar: 0,
degraded: 0
}
};
for (const query of testQueries) {
// Get results from old system
const oldResults = await oldSearchFn(query);
// Get results from Memory OS
const newResults = await memory.search({
query,
limit: 10,
threshold: similarityThreshold
});
// Compare top results
const oldTopContent = oldResults.slice(0, 5).map(r => r.content || r.text);
const newTopContent = newResults.results.slice(0, 5).map(r => r.content);
// Calculate overlap
const overlap = oldTopContent.filter(old =>
newTopContent.some(newC =>
newC.includes(old.substring(0, 50)) || old.includes(newC.substring(0, 50))
)
).length;
const overlapRatio = overlap / Math.max(oldTopContent.length, 1);
let status;
if (newResults.results.length > oldResults.length) {
status = 'improved';
comparison.summary.improved++;
} else if (overlapRatio >= 0.6) {
status = 'similar';
comparison.summary.similar++;
} else {
status = 'degraded';
comparison.summary.degraded++;
}
comparison.queries.push({
query,
oldResultCount: oldResults.length,
newResultCount: newResults.results.length,
overlapRatio,
status
});
}
return comparison;
}Gradual Rollout Strategy
For production systems, use a gradual rollout:
class DualMemorySystem {
constructor(oldSystem, newMemoryOS, config = {}) {
this.old = oldSystem;
this.new = newMemoryOS;
this.rolloutPercentage = config.rolloutPercentage || 0;
this.compareMode = config.compareMode || false;
}
async search(query, options = {}) {
const useNew = Math.random() * 100 < this.rolloutPercentage;
if (this.compareMode) {
// Run both and log differences
const [oldResults, newResults] = await Promise.all([
this.old.search(query, options),
this.new.search({ query, ...options })
]);
this.logComparison(query, oldResults, newResults.results);
return useNew ? newResults.results : oldResults;
}
if (useNew) {
return (await this.new.search({ query, ...options })).results;
}
return this.old.search(query, options);
}
async store(content, options = {}) {
// Always write to both during migration
const [oldResult, newResult] = await Promise.all([
this.old.store(content, options),
this.new.memories.create({
content,
tier: options.tier || 'medium',
content_type: options.contentType || 'text',
metadata: options.metadata
})
]);
return { old: oldResult, new: newResult };
}
setRolloutPercentage(percentage) {
this.rolloutPercentage = Math.max(0, Math.min(100, percentage));
console.log(`Rollout percentage set to ${this.rolloutPercentage}%`);
}
logComparison(query, oldResults, newResults) {
// Log to your monitoring system
console.log({
query,
oldCount: oldResults.length,
newCount: newResults.length,
timestamp: new Date().toISOString()
});
}
}
// Usage
const dualSystem = new DualMemorySystem(oldMemoryStore, memory, {
rolloutPercentage: 10, // Start with 10%
compareMode: true
});
// Gradually increase rollout
// dualSystem.setRolloutPercentage(25);
// dualSystem.setRolloutPercentage(50);
// dualSystem.setRolloutPercentage(100);Common Issues and Solutions
Issue: Duplicate Memories After Migration
Symptom: Search returns duplicate or near-duplicate results.
Solution: Deduplicate before or after migration:
async function deduplicateMemories(threshold = 0.95) {
const processed = new Set();
let deleted = 0;
// Get all memories
let offset = 0;
const limit = 100;
while (true) {
const batch = await memory.memories.list({ limit, offset });
if (batch.data.length === 0) break;
for (const mem of batch.data) {
if (processed.has(mem.id)) continue;
// Search for near-duplicates
const similar = await memory.search({
query: mem.content,
threshold,
limit: 10
});
// Keep the first (original), delete duplicates
for (const dup of similar.results) {
if (dup.id !== mem.id && !processed.has(dup.id)) {
await memory.memories.delete(dup.id);
processed.add(dup.id);
deleted++;
}
}
processed.add(mem.id);
}
offset += limit;
}
return { deleted };
}Issue: Missing Content After Migration
Symptom: Some records weren't migrated.
Solution: Track failed items and retry:
async function migrateWithTracking(items, migrateFn) {
const failed = [];
for (const item of items) {
try {
await migrateFn(item);
} catch (error) {
failed.push({
item,
error: error.message,
timestamp: new Date().toISOString()
});
}
}
// Save failed items for retry
if (failed.length > 0) {
await fs.writeFile(
'migration_failed.json',
JSON.stringify(failed, null, 2)
);
console.log(`${failed.length} items failed. Saved to migration_failed.json`);
}
return { total: items.length, failed: failed.length };
}
// Retry failed items
async function retryFailed(migrateFn) {
const content = await fs.readFile('migration_failed.json', 'utf-8');
const failed = JSON.parse(content);
return migrateWithTracking(
failed.map(f => f.item),
migrateFn
);
}Issue: Embedding Dimension Mismatch
Symptom: Error when providing existing embeddings.
Solution: Either re-embed or transform dimensions:
function handleDimensionMismatch(embedding, targetDimension) {
if (embedding.length === targetDimension) {
return embedding;
}
if (embedding.length > targetDimension) {
// Truncate (not recommended for production)
console.warn('Truncating embedding - consider re-embedding instead');
return embedding.slice(0, targetDimension);
}
// If smaller, better to re-embed
console.warn('Embedding too small - will re-embed');
return null; // Return null to trigger re-embedding
}Issue: Rate Limiting During Large Migration
Symptom: Migration stalls with rate limit errors.
Solution: Implement adaptive rate limiting:
class AdaptiveRateLimiter {
constructor() {
this.delayMs = 50;
this.minDelay = 20;
this.maxDelay = 5000;
this.consecutiveSuccesses = 0;
this.consecutiveFailures = 0;
}
async execute(fn) {
await delay(this.delayMs);
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
if (error.code === 'RATE_LIMIT_EXCEEDED') {
this.onRateLimit(error.retryAfter);
// Retry after waiting
await delay(this.delayMs);
return this.execute(fn);
}
throw error;
}
}
onSuccess() {
this.consecutiveSuccesses++;
this.consecutiveFailures = 0;
// Speed up after 10 consecutive successes
if (this.consecutiveSuccesses >= 10) {
this.delayMs = Math.max(this.minDelay, this.delayMs * 0.9);
this.consecutiveSuccesses = 0;
}
}
onRateLimit(retryAfter) {
this.consecutiveFailures++;
this.consecutiveSuccesses = 0;
// Back off exponentially
this.delayMs = Math.min(
this.maxDelay,
Math.max(retryAfter || this.delayMs * 2, this.delayMs * 1.5)
);
console.log(`Rate limited. New delay: ${this.delayMs}ms`);
}
}
// Usage
const limiter = new AdaptiveRateLimiter();
for (const item of items) {
await limiter.execute(() => memory.memories.create(item));
}Issue: Metadata Schema Differences
Symptom: Important metadata fields not accessible.
Solution: Create a metadata transformation layer:
function transformMetadata(oldMetadata, sourceSystem) {
// Define transformations for each source system
const transformations = {
pinecone: (m) => ({
user_id: m.userId || m.user_id,
content_type: m.type || m.contentType || 'text',
tags: m.tags || m.labels || [],
importance: m.importance || m.score,
source: 'pinecone',
original: m
}),
redis: (m) => ({
user_id: m.userId || m.user,
session_id: m.sessionId || m.session,
content_type: m.type || 'text',
source: 'redis',
original: m
}),
postgresql: (m) => ({
user_id: m.user_id,
content_type: m.category || m.type || 'text',
tags: m.tags ? JSON.parse(m.tags) : [],
source: 'postgresql',
original: m
})
};
const transform = transformations[sourceSystem] || ((m) => m);
return transform(oldMetadata);
}