Enhancing RAG with Memory OS
Retrieval-Augmented Generation (RAG) systems retrieve relevant documents to provide context for LLM responses. Memory OS enhances RAG by adding user-specific memory, temporal awareness, and intelligent relevance scoring that goes beyond simple vector similarity.
The Limitations of Basic RAG
Traditional RAG systems have several limitations that Memory OS addresses:
| Limitation | Basic RAG | RAG + Memory OS |
|---|---|---|
| User context | Same retrieval for all users | User-specific relevance and preferences |
| Temporal awareness | No concept of "recent" vs "old" | Automatic decay and recency scoring |
| Learning | Static retrieval | Improves with feedback and access patterns |
| Personalization | Generic ranking | Personalized ranking based on history |
| Conversation continuity | No session context | Multi-turn conversation awareness |
How Memory OS Enhances RAG
Architecture Overview
Key Enhancements
1. User-Specific Context
Memory OS retrieves context tailored to each user, allowing the same document to be interpreted differently based on user history and preferences.
import { MemoryOS } from '@memory-os/sdk';
const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
async function getUserContext(userId, query) {
// Get user-specific memories relevant to the query
const userContext = await memory.getContext({
query,
max_tokens: 1500
});
// Get user preferences that might affect interpretation
const preferences = await memory.search({
query: `user preferences ${query}`,
tier: 'long',
limit: 5,
threshold: 0.6
});
return {
context: userContext.context,
memories: userContext.memories,
preferences: preferences.results.map(p => p.content),
token_count: userContext.token_count
};
}from memoryos import MemoryOS
memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])
def get_user_context(user_id: str, query: str):
# Get user-specific memories relevant to the query
user_context = memory.get_context(
query=query,
max_tokens=1500
)
# Get user preferences that might affect interpretation
preferences = memory.search(
query=f"user preferences {query}",
tier="long",
limit=5,
threshold=0.6
)
return {
"context": user_context["context"],
"memories": user_context["memories"],
"preferences": [p["content"] for p in preferences["results"]],
"token_count": user_context["token_count"]
}2. Temporal Awareness
Memory OS scores memories with recency in mind, ensuring recent information is prioritized appropriately.
// Recent conversation context (high recency score)
await memory.memories.create({
content: "User mentioned they're switching to React from Vue",
tier: 'short', // Fast decay - relevant now, less so next week
content_type: 'fact',
memory_nature: 'episodic'
});
// Stable preference (low decay rate)
await memory.memories.create({
content: "User prefers TypeScript over JavaScript",
tier: 'long', // Slow decay - likely still relevant in months
content_type: 'fact',
memory_nature: 'semantic'
});3. Intelligent Relevance Scoring
Memory OS uses a 6-factor scoring algorithm that considers more than just vector similarity:
combined_score = (0.40 * similarity) + // Semantic match
(0.20 * recency_score) + // How recent
(0.15 * importance_score) + // Explicit priority
(0.10 * access_score) + // Usage patterns
(0.10 * feedback_score) + // User feedback
(0.05 * entity_score) // Related entitiesImplementation: Enhanced RAG System
Complete Example
import { MemoryOS } from '@memory-os/sdk';
import OpenAI from 'openai';
const memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
class EnhancedRAGSystem {
constructor(userId) {
this.userId = userId;
}
async query(userQuery) {
// Step 1: Parallel retrieval from both sources
const [documents, userMemory] = await Promise.all([
this.retrieveDocuments(userQuery),
this.retrieveUserContext(userQuery)
]);
// Step 2: Fuse contexts with intelligent allocation
const fusedContext = this.fuseContexts(documents, userMemory, {
documentBudget: 3000,
memoryBudget: 1500
});
// Step 3: Generate response
const response = await this.generateResponse(userQuery, fusedContext);
// Step 4: Store interaction for future context
await this.storeInteraction(userQuery, response);
return {
response,
sources: {
documents: documents.map(d => d.title),
memories: userMemory.memories.map(m => m.id)
}
};
}
async retrieveDocuments(query) {
// Your existing document retrieval (Pinecone, Weaviate, etc.)
// This is a placeholder - replace with your vector DB
const documentResponse = await fetch('https://your-vector-db/search', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
query,
top_k: 5
})
});
const { results } = await documentResponse.json();
return results;
}
async retrieveUserContext(query) {
// Get user-specific context from Memory OS
const context = await memory.getContext({
query,
max_tokens: 1500
});
// Also get any relevant long-term knowledge
const knowledge = await memory.search({
query,
tier: 'long',
limit: 5,
threshold: 0.6
});
// Merge unique memories
const allMemories = [...context.memories];
for (const mem of knowledge.results) {
if (!allMemories.find(m => m.id === mem.id)) {
allMemories.push(mem);
}
}
return {
context: context.context,
memories: allMemories,
token_count: context.token_count
};
}
fuseContexts(documents, userMemory, budgets) {
// Re-rank documents based on user context
const rankedDocuments = this.rerankWithUserContext(documents, userMemory);
// Allocate tokens intelligently
const documentContext = this.selectTopDocuments(
rankedDocuments,
budgets.documentBudget
);
const memoryContext = this.selectTopMemories(
userMemory.memories,
budgets.memoryBudget
);
return {
documents: documentContext,
memories: memoryContext,
user_preferences: this.extractPreferences(userMemory.memories)
};
}
rerankWithUserContext(documents, userMemory) {
// Boost documents that align with user's known interests/expertise
const userTopics = this.extractTopics(userMemory.context);
return documents.map(doc => {
let boost = 0;
// Check if document matches user's known interests
for (const topic of userTopics) {
if (doc.content.toLowerCase().includes(topic.toLowerCase())) {
boost += 0.1;
}
}
// Check if document matches user's expertise level
if (userMemory.context.includes('expert') && doc.metadata?.level === 'advanced') {
boost += 0.15;
}
return {
...doc,
adjusted_score: (doc.score || 0.5) + boost
};
}).sort((a, b) => b.adjusted_score - a.adjusted_score);
}
extractTopics(context) {
// Simple topic extraction - could be enhanced with NLP
const topics = [];
const patterns = [
/works with (\w+)/gi,
/interested in (\w+)/gi,
/prefers (\w+)/gi,
/expert in (\w+)/gi
];
for (const pattern of patterns) {
const matches = context.matchAll(pattern);
for (const match of matches) {
topics.push(match[1]);
}
}
return [...new Set(topics)];
}
selectTopDocuments(documents, budget) {
let tokenCount = 0;
const selected = [];
for (const doc of documents) {
const docTokens = Math.ceil(doc.content.length / 4);
if (tokenCount + docTokens <= budget) {
selected.push(doc);
tokenCount += docTokens;
} else {
break;
}
}
return selected;
}
selectTopMemories(memories, budget) {
let tokenCount = 0;
const selected = [];
// Sort by score
const sorted = [...memories].sort((a, b) => (b.score || 0) - (a.score || 0));
for (const mem of sorted) {
const memTokens = Math.ceil(mem.content.length / 4);
if (tokenCount + memTokens <= budget) {
selected.push(mem);
tokenCount += memTokens;
} else {
break;
}
}
return selected;
}
extractPreferences(memories) {
return memories
.filter(m => m.metadata?.category === 'preference' || m.tier === 'long')
.map(m => m.content)
.slice(0, 5);
}
async generateResponse(query, context) {
const systemPrompt = this.buildPrompt(context);
const completion = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{ role: 'system', content: systemPrompt },
{ role: 'user', content: query }
]
});
return completion.choices[0].message.content;
}
buildPrompt(context) {
const docContext = context.documents
.map(d => `### ${d.title || 'Document'}\n${d.content}`)
.join('\n\n');
const memContext = context.memories
.map(m => `- ${m.content}`)
.join('\n');
const prefContext = context.user_preferences
.map(p => `- ${p}`)
.join('\n');
return `You are a helpful assistant with access to relevant documents and user-specific memory.
## Relevant Documents
${docContext || 'No relevant documents found.'}
## What You Know About This User
${memContext || 'No user-specific context available.'}
## User Preferences
${prefContext || 'No preferences recorded.'}
## Instructions
1. Use the documents to provide accurate, factual information
2. Personalize your response based on user context and preferences
3. If documents conflict with user memory, prioritize recent information
4. Cite sources when referencing specific documents
5. Acknowledge when information might be outdated`;
}
async storeInteraction(query, response) {
// Store for future context
await memory.memories.create({
content: `Query: ${query.substring(0, 200)}\nResponse summary: ${response.substring(0, 300)}`,
tier: 'short',
content_type: 'conversation',
memory_nature: 'episodic',
metadata: {
user_id: this.userId,
interaction_type: 'rag_query',
timestamp: new Date().toISOString()
}
});
}
}
// Usage
const ragSystem = new EnhancedRAGSystem('user_123');
const result = await ragSystem.query("How do I implement authentication in Next.js?");
console.log('Response:', result.response);
console.log('Document sources:', result.sources.documents);
console.log('Memory sources:', result.sources.memories);import os
from typing import Dict, List, Any, Optional
from memoryos import MemoryOS
from openai import OpenAI
import re
memory = MemoryOS(api_key=os.environ["MEMORY_OS_API_KEY"])
openai_client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
class EnhancedRAGSystem:
def __init__(self, user_id: str):
self.user_id = user_id
def query(self, user_query: str) -> Dict[str, Any]:
# Step 1: Parallel retrieval from both sources
documents = self.retrieve_documents(user_query)
user_memory = self.retrieve_user_context(user_query)
# Step 2: Fuse contexts with intelligent allocation
fused_context = self.fuse_contexts(
documents,
user_memory,
document_budget=3000,
memory_budget=1500
)
# Step 3: Generate response
response = self.generate_response(user_query, fused_context)
# Step 4: Store interaction for future context
self.store_interaction(user_query, response)
return {
"response": response,
"sources": {
"documents": [d.get("title", "Untitled") for d in documents],
"memories": [m["id"] for m in user_memory["memories"]]
}
}
def retrieve_documents(self, query: str) -> List[Dict]:
# Your existing document retrieval (Pinecone, Weaviate, etc.)
# This is a placeholder - replace with your vector DB
import requests
response = requests.post(
"https://your-vector-db/search",
json={"query": query, "top_k": 5}
)
return response.json().get("results", [])
def retrieve_user_context(self, query: str) -> Dict:
# Get user-specific context from Memory OS
context = memory.get_context(
query=query,
max_tokens=1500
)
# Also get any relevant long-term knowledge
knowledge = memory.search(
query=query,
tier="long",
limit=5,
threshold=0.6
)
# Merge unique memories
all_memories = list(context["memories"])
existing_ids = {m["id"] for m in all_memories}
for mem in knowledge["results"]:
if mem["id"] not in existing_ids:
all_memories.append(mem)
return {
"context": context["context"],
"memories": all_memories,
"token_count": context["token_count"]
}
def fuse_contexts(
self,
documents: List[Dict],
user_memory: Dict,
document_budget: int,
memory_budget: int
) -> Dict:
# Re-rank documents based on user context
ranked_documents = self.rerank_with_user_context(documents, user_memory)
# Allocate tokens intelligently
document_context = self.select_top_documents(ranked_documents, document_budget)
memory_context = self.select_top_memories(user_memory["memories"], memory_budget)
return {
"documents": document_context,
"memories": memory_context,
"user_preferences": self.extract_preferences(user_memory["memories"])
}
def rerank_with_user_context(
self,
documents: List[Dict],
user_memory: Dict
) -> List[Dict]:
# Boost documents that align with user's known interests/expertise
user_topics = self.extract_topics(user_memory["context"])
ranked = []
for doc in documents:
boost = 0
# Check if document matches user's known interests
for topic in user_topics:
if topic.lower() in doc.get("content", "").lower():
boost += 0.1
# Check if document matches user's expertise level
if "expert" in user_memory["context"]:
if doc.get("metadata", {}).get("level") == "advanced":
boost += 0.15
ranked.append({
**doc,
"adjusted_score": doc.get("score", 0.5) + boost
})
return sorted(ranked, key=lambda x: x["adjusted_score"], reverse=True)
def extract_topics(self, context: str) -> List[str]:
# Simple topic extraction
topics = []
patterns = [
r"works with (\w+)",
r"interested in (\w+)",
r"prefers (\w+)",
r"expert in (\w+)"
]
for pattern in patterns:
matches = re.findall(pattern, context, re.IGNORECASE)
topics.extend(matches)
return list(set(topics))
def select_top_documents(self, documents: List[Dict], budget: int) -> List[Dict]:
token_count = 0
selected = []
for doc in documents:
doc_tokens = len(doc.get("content", "")) // 4
if token_count + doc_tokens <= budget:
selected.append(doc)
token_count += doc_tokens
else:
break
return selected
def select_top_memories(self, memories: List[Dict], budget: int) -> List[Dict]:
token_count = 0
selected = []
# Sort by score
sorted_memories = sorted(
memories,
key=lambda x: x.get("score", 0),
reverse=True
)
for mem in sorted_memories:
mem_tokens = len(mem.get("content", "")) // 4
if token_count + mem_tokens <= budget:
selected.append(mem)
token_count += mem_tokens
else:
break
return selected
def extract_preferences(self, memories: List[Dict]) -> List[str]:
return [
m["content"]
for m in memories
if m.get("metadata", {}).get("category") == "preference"
or m.get("tier") == "long"
][:5]
def generate_response(self, query: str, context: Dict) -> str:
system_prompt = self.build_prompt(context)
completion = openai_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": query}
]
)
return completion.choices[0].message.content
def build_prompt(self, context: Dict) -> str:
doc_context = "\n\n".join(
f"### {d.get('title', 'Document')}\n{d.get('content', '')}"
for d in context["documents"]
)
mem_context = "\n".join(
f"- {m['content']}"
for m in context["memories"]
)
pref_context = "\n".join(
f"- {p}"
for p in context["user_preferences"]
)
return f"""You are a helpful assistant with access to relevant documents and user-specific memory.
## Relevant Documents
{doc_context or 'No relevant documents found.'}
## What You Know About This User
{mem_context or 'No user-specific context available.'}
## User Preferences
{pref_context or 'No preferences recorded.'}
## Instructions
1. Use the documents to provide accurate, factual information
2. Personalize your response based on user context and preferences
3. If documents conflict with user memory, prioritize recent information
4. Cite sources when referencing specific documents
5. Acknowledge when information might be outdated"""
def store_interaction(self, query: str, response: str):
memory.memories.create(
content=f"Query: {query[:200]}\nResponse summary: {response[:300]}",
tier="short",
content_type="conversation",
memory_nature="episodic",
metadata={
"user_id": self.user_id,
"interaction_type": "rag_query",
"timestamp": datetime.utcnow().isoformat()
}
)
# Usage
rag_system = EnhancedRAGSystem("user_123")
result = rag_system.query("How do I implement authentication in Next.js?")
print(f"Response: {result['response']}")
print(f"Document sources: {result['sources']['documents']}")
print(f"Memory sources: {result['sources']['memories']}")Advanced Patterns
Feedback Loop for RAG Improvement
Track which retrieved content is actually useful and adjust future retrievals.
class FeedbackEnhancedRAG {
constructor(userId) {
this.userId = userId;
this.memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
}
async recordRetrievalFeedback(queryId, documentId, wasHelpful) {
// Store feedback about document relevance
await this.memory.memories.create({
content: `Document ${documentId} was ${wasHelpful ? 'helpful' : 'not helpful'} for query type`,
tier: 'long',
content_type: 'fact',
memory_nature: 'semantic',
importance_score: wasHelpful ? 0.8 : 0.3,
metadata: {
user_id: this.userId,
type: 'retrieval_feedback',
document_id: documentId,
query_id: queryId,
helpful: wasHelpful,
timestamp: new Date().toISOString()
}
});
}
async getDocumentBoostFactors(documentIds) {
const boosts = {};
for (const docId of documentIds) {
// Check past feedback for this document
const feedback = await this.memory.search({
query: `Document ${docId} feedback`,
tier: 'long',
limit: 10,
threshold: 0.7
});
const helpfulCount = feedback.results.filter(f =>
f.metadata?.document_id === docId && f.metadata?.helpful
).length;
const notHelpfulCount = feedback.results.filter(f =>
f.metadata?.document_id === docId && !f.metadata?.helpful
).length;
// Calculate boost factor
const total = helpfulCount + notHelpfulCount;
if (total > 0) {
boosts[docId] = (helpfulCount / total) * 0.2; // Max 20% boost
}
}
return boosts;
}
async rerankWithFeedback(documents) {
const docIds = documents.map(d => d.id);
const boosts = await this.getDocumentBoostFactors(docIds);
return documents.map(doc => ({
...doc,
score: (doc.score || 0.5) + (boosts[doc.id] || 0)
})).sort((a, b) => b.score - a.score);
}
}Multi-Index RAG with Memory OS
Combine multiple document sources with user memory for comprehensive retrieval.
class MultiIndexRAG {
constructor(userId, indexes) {
this.userId = userId;
this.indexes = indexes; // { products: pineconeClient, docs: weaviateClient }
this.memory = new MemoryOS({ apiKey: process.env.MEMORY_OS_API_KEY });
}
async query(userQuery) {
// Determine which indexes are relevant based on user history
const relevantIndexes = await this.selectRelevantIndexes(userQuery);
// Retrieve from all relevant sources in parallel
const retrievalPromises = [
this.memory.getContext({ query: userQuery, max_tokens: 1000 }),
...relevantIndexes.map(idx =>
this.indexes[idx].search(userQuery, { limit: 5 })
)
];
const results = await Promise.all(retrievalPromises);
const userContext = results[0];
const indexResults = results.slice(1);
// Merge and deduplicate results
const mergedResults = this.mergeResults(indexResults, relevantIndexes);
// Re-rank based on user context
const rankedResults = this.rankWithUserContext(mergedResults, userContext);
return {
documents: rankedResults,
userContext: userContext.context,
indexes_used: relevantIndexes
};
}
async selectRelevantIndexes(query) {
// Check user's history to see which indexes have been helpful
const pastQueries = await this.memory.search({
query: `index selection ${query}`,
tier: 'medium',
limit: 5,
threshold: 0.6
});
const indexScores = {};
for (const [name, _] of Object.entries(this.indexes)) {
indexScores[name] = 0.5; // Base score
}
// Boost indexes that were useful for similar queries
for (const past of pastQueries.results) {
if (past.metadata?.useful_indexes) {
for (const idx of past.metadata.useful_indexes) {
indexScores[idx] = (indexScores[idx] || 0) + 0.1;
}
}
}
// Return indexes with score > 0.5
return Object.entries(indexScores)
.filter(([_, score]) => score > 0.5)
.sort(([, a], [, b]) => b - a)
.map(([name]) => name);
}
mergeResults(indexResults, indexNames) {
const merged = [];
const seen = new Set();
for (let i = 0; i < indexResults.length; i++) {
for (const result of indexResults[i]) {
const key = result.content.substring(0, 100);
if (!seen.has(key)) {
seen.add(key);
merged.push({
...result,
source_index: indexNames[i]
});
}
}
}
return merged;
}
rankWithUserContext(results, userContext) {
// Apply user-specific boosting
return results.map(r => {
let boost = 0;
// Boost if matches user's expertise area
if (userContext.context.includes(r.source_index)) {
boost += 0.1;
}
return {
...r,
final_score: (r.score || 0.5) + boost
};
}).sort((a, b) => b.final_score - a.final_score);
}
}Performance Considerations
Token Budget Allocation
Allocate your token budget wisely between documents and user memory:
| Use Case | Document Budget | Memory Budget | Rationale |
|---|---|---|---|
| Knowledge base Q&A | 70% | 30% | Documents are primary source |
| Personal assistant | 40% | 60% | User context is critical |
| Customer support | 50% | 50% | Balance knowledge and history |
| Research assistant | 60% | 40% | Documents important, but track user's research history |
Caching Strategies
class CachedRAGSystem {
constructor(userId) {
this.userId = userId;
this.cache = new Map();
this.cacheTTL = 60000; // 1 minute
}
async getContextWithCache(query) {
const cacheKey = this.getCacheKey(query);
const cached = this.cache.get(cacheKey);
if (cached && Date.now() - cached.timestamp < this.cacheTTL) {
return cached.data;
}
// Parallel retrieval
const [documents, userContext] = await Promise.all([
this.retrieveDocuments(query),
this.memory.getContext({ query, max_tokens: 1500 })
]);
const result = { documents, userContext };
this.cache.set(cacheKey, {
data: result,
timestamp: Date.now()
});
return result;
}
getCacheKey(query) {
// Normalize query for caching
return `${this.userId}:${query.toLowerCase().trim().substring(0, 100)}`;
}
invalidateCache() {
this.cache.clear();
}
}Latency Optimization
// Parallel retrieval reduces latency
async function optimizedRetrieval(query, userId) {
const startTime = Date.now();
// All retrievals happen in parallel
const [docResults, shortTerm, longTerm] = await Promise.all([
vectorDB.search(query),
memory.getContext({ query, tier: 'short', max_tokens: 500 }),
memory.getContext({ query, tier: 'long', max_tokens: 1000 })
]);
console.log(`Total retrieval time: ${Date.now() - startTime}ms`);
return {
documents: docResults,
recentContext: shortTerm.context,
persistentContext: longTerm.context
};
}Best Practices
1. Separate Concerns
Keep document retrieval and user memory distinct but complementary:
// Documents: factual knowledge
const docContext = await vectorDB.search(query);
// Memory: user-specific context
const userContext = await memory.getContext({ query });
// Combine appropriately
const fullContext = buildContext(docContext, userContext);2. Update Memory Based on RAG Usage
// After a helpful RAG interaction
await memory.memories.create({
content: `User frequently queries about ${topic}`,
tier: 'long',
content_type: 'fact',
memory_nature: 'semantic',
metadata: {
user_id: userId,
type: 'interest',
topic
}
});3. Handle Conflicts
When documents and memory conflict, have a clear resolution strategy:
function resolveConflicts(docContext, memoryContext) {
// Prefer more recent information
// Prefer explicit user corrections over documents
// Flag potential conflicts for user review
return `
## Important Context
${memoryContext}
## Reference Documentation
Note: The following documentation may be newer than stored context.
${docContext}
`;
}