Memory API Reference
Complete API reference for memory backends and conversation persistence in AA Kit.
Import Statement
from aakit import MemoryBackend, InMemoryBackend, RedisBackend, SQLiteBackend, create_memoryMemoryBackend Base Class
Abstract base class for all memory backends
MemoryBackend
class MemoryBackend(ABC):
def __init__(self, config: Dict[str, Any] = None)Base class for implementing custom memory backends
Parameters
config(Dict[str, Any])- Backend-specific configurationExample
from aakit import MemoryBackend
class CustomMemoryBackend(MemoryBackend):
async def save(self, session_id: str, messages: List[Message]):
# Your implementation
pass
async def load(self, session_id: str) -> List[Message]:
# Your implementation
pass
async def clear(self, session_id: str = None):
# Your implementation
passsave
async def save(self, session_id: str, messages: List[Message]) -> NoneSave messages to the memory backend
Parameters
session_id(str)- Session identifiermessages(List[Message])- Messages to saveExample
await memory.save(
"user_123",
[
Message(role="user", content="Hello"),
Message(role="assistant", content="Hi there!")
]
)load
async def load(self, session_id: str, limit: int = None) -> List[Message]Load messages from the memory backend
Parameters
session_id(str)- Session identifierlimit(int)- Maximum number of messages to loadExample
messages = await memory.load("user_123", limit=50)clear
async def clear(self, session_id: str = None) -> NoneClear messages from memory
Parameters
session_id(str)- Session to clear (None clears all)Example
# Clear specific session
await memory.clear("user_123")
# Clear all sessions
await memory.clear()search
async def search(
self,
query: str,
session_id: str = None,
limit: int = 10
) -> List[Message]Search messages by content
Parameters
query(str)- Search querysession_id(str)- Limit search to specific sessionlimit(int)- Maximum results to returnExample
results = await memory.search(
"python programming",
session_id="user_123",
limit=5
)InMemoryBackend
Fast in-memory storage for development
InMemoryBackend
InMemoryBackend(
max_sessions: int = 1000,
max_messages_per_session: int = 1000,
ttl: int = None
)Memory backend that stores data in RAM
Parameters
max_sessions(int)- Maximum number of sessions to storemax_messages_per_session(int)- Maximum messages per sessionttl(int)- Time-to-live in seconds (optional)Example
from aakit import InMemoryBackend
memory = InMemoryBackend(
max_sessions=500,
max_messages_per_session=200,
ttl=3600 # 1 hour
)
agent = Agent(
name="dev_agent",
instruction="...",
memory=memory
)RedisBackend
Distributed memory storage with Redis
RedisBackend
RedisBackend(
client: redis.Redis = None,
url: str = None,
prefix: str = "agentz:sessions:",
ttl: int = 86400,
max_messages: int = 1000,
compression: bool = True
)Redis-based memory backend for production use
Parameters
client(redis.Redis)- Existing Redis client instanceurl(str)- Redis connection URLprefix(str)- Key prefix for sessionsttl(int)- Time-to-live in secondsmax_messages(int)- Maximum messages per sessioncompression(bool)- Enable message compressionExample
from aakit import RedisBackend
import redis
# Option 1: With existing client
redis_client = redis.Redis(host='localhost', port=6379, db=0)
memory = RedisBackend(client=redis_client, ttl=7200)
# Option 2: With URL
memory = RedisBackend(
url="redis://localhost:6379/0",
prefix="agent:memory:",
ttl=86400,
compression=True
)
agent = Agent(
name="prod_agent",
instruction="...",
memory=memory
)SQLiteBackend
File-based persistent storage
SQLiteBackend
SQLiteBackend(
db_path: str = "agent_memory.db",
table_name: str = "conversations",
auto_vacuum: bool = True,
journal_mode: str = "WAL"
)SQLite backend for local persistent storage
Parameters
db_path(str)- Path to SQLite database filetable_name(str)- Table name for storing messagesauto_vacuum(bool)- Enable automatic database cleanupjournal_mode(str)- SQLite journal modeExample
from aakit import SQLiteBackend
memory = SQLiteBackend(
db_path="./data/conversations.db",
table_name="agent_memory",
auto_vacuum=True
)
agent = Agent(
name="local_agent",
instruction="...",
memory=memory
)
# Query with SQL
results = await memory.query(
"SELECT * FROM agent_memory WHERE content LIKE ?",
("%python%",)
)PostgreSQLBackend
Enterprise-grade database storage
PostgreSQLBackend
PostgreSQLBackend(
connection_string: str = None,
pool_size: int = 10,
table_name: str = "agent_conversations",
schema: str = "public",
enable_search: bool = True
)PostgreSQL backend for scalable production deployments
Parameters
connection_string(str)- PostgreSQL connection stringpool_size(int)- Connection pool sizetable_name(str)- Table name for messagesschema(str)- Database schema to useenable_search(bool)- Enable full-text searchExample
from aakit import PostgreSQLBackend
memory = PostgreSQLBackend(
connection_string="postgresql://user:pass@localhost/agentdb",
pool_size=20,
table_name="conversations",
enable_search=True
)
# Full-text search
results = await memory.search_fulltext(
"machine learning",
session_ids=["user_123", "user_456"],
limit=10
)Memory Utilities
Helper functions for memory management
create_memory
def create_memory(
backend: str,
**kwargs
) -> MemoryBackendFactory function to create memory backends
Parameters
backend(str)- Backend type: 'memory', 'redis', 'sqlite', 'postgres'**kwargs(dict)- Backend-specific configurationExample
from aakit import create_memory
# Create Redis backend
memory = create_memory(
"redis",
url="redis://localhost:6379",
ttl=3600
)
# Create SQLite backend
memory = create_memory(
"sqlite",
db_path="./agent.db"
)migrate_memory
async def migrate_memory(
source: MemoryBackend,
target: MemoryBackend,
session_ids: List[str] = None,
batch_size: int = 100
) -> MigrationResultMigrate data between memory backends
Parameters
source(MemoryBackend)- Source memory backendtarget(MemoryBackend)- Target memory backendsession_ids(List[str])- Specific sessions to migratebatch_size(int)- Batch size for migrationExample
from aakit import migrate_memory
# Migrate from SQLite to Redis
source = SQLiteBackend("old.db")
target = RedisBackend(url="redis://localhost:6379")
result = await migrate_memory(
source,
target,
batch_size=500
)
print(f"Migrated {result.total_messages} messages")Memory Configuration
Configuration classes for memory backends
MemoryConfig
class MemoryConfig:
backend: str = "memory"
max_messages: int = 1000
context_window: int = 20
summarization: bool = False
compression: bool = False
encryption: bool = False
search_enabled: bool = TrueCommon configuration for memory backends
Example
from aakit import MemoryConfig
config = MemoryConfig(
backend="redis",
max_messages=5000,
context_window=50,
summarization=True,
compression=True
)
agent = Agent(
name="configured_agent",
instruction="...",
memory_config=config
)Type Definitions
Message
class Message:
role: str # "user", "assistant", "system", "tool"
content: str
timestamp: datetime
metadata: Dict[str, Any] = {}
def to_dict(self) -> Dict:
"""Convert to dictionary"""
@classmethod
def from_dict(cls, data: Dict) -> Message:
"""Create from dictionary"""SessionInfo
class SessionInfo:
session_id: str
created_at: datetime
updated_at: datetime
message_count: int
metadata: Dict[str, Any]MigrationResult
class MigrationResult:
success: bool
total_sessions: int
total_messages: int
failed_sessions: List[str]
duration: float
errors: List[str]Complete Example
from aakit import Agent, RedisBackend, create_memory, Message
import redis
import asyncio
from datetime import datetime, timedelta
# Example 1: Custom Memory Backend
class TimedMemoryBackend(RedisBackend):
"""Memory backend that tracks message timing"""
async def save(self, session_id: str, messages: List[Message]):
# Add timing metadata
for msg in messages:
msg.metadata["saved_at"] = datetime.now().isoformat()
await super().save(session_id, messages)
async def get_recent_messages(
self,
session_id: str,
since: datetime
) -> List[Message]:
"""Get messages since a specific time"""
all_messages = await self.load(session_id)
recent = []
for msg in all_messages:
if "saved_at" in msg.metadata:
msg_time = datetime.fromisoformat(msg.metadata["saved_at"])
if msg_time > since:
recent.append(msg)
return recent
# Example 2: Multi-Backend Memory
class MultiBackendMemory:
"""Use multiple backends for redundancy"""
def __init__(self, backends: List[MemoryBackend]):
self.backends = backends
async def save(self, session_id: str, messages: List[Message]):
# Save to all backends in parallel
await asyncio.gather(*[
backend.save(session_id, messages)
for backend in self.backends
])
async def load(self, session_id: str) -> List[Message]:
# Load from first available backend
for backend in self.backends:
try:
return await backend.load(session_id)
except Exception:
continue
return []
# Example 3: Memory with Analytics
class AnalyticsMemory(MemoryBackend):
"""Track conversation analytics"""
def __init__(self, backend: MemoryBackend):
self.backend = backend
self.stats = {}
async def save(self, session_id: str, messages: List[Message]):
await self.backend.save(session_id, messages)
# Update analytics
if session_id not in self.stats:
self.stats[session_id] = {
"message_count": 0,
"total_tokens": 0,
"topics": set()
}
for msg in messages:
self.stats[session_id]["message_count"] += 1
if "tokens" in msg.metadata:
self.stats[session_id]["total_tokens"] += msg.metadata["tokens"]
def get_analytics(self, session_id: str = None):
if session_id:
return self.stats.get(session_id, {})
return self.stats
# Example 4: Complete Implementation
async def main():
# Setup Redis backend with custom configuration
redis_client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True,
connection_pool=redis.ConnectionPool(
max_connections=50,
health_check_interval=30
)
)
memory = RedisBackend(
client=redis_client,
prefix="agent:prod:",
ttl=86400, # 24 hours
max_messages=1000,
compression=True
)
# Create agent with memory
agent = Agent(
name="assistant",
instruction="You are a helpful assistant",
model="gpt-4",
memory=memory
)
# Chat with session management
session_id = "user_123"
# First conversation
response1 = await agent.chat(
"My name is Alice and I work at TechCorp",
session_id=session_id
)
response2 = await agent.chat(
"What company do I work at?",
session_id=session_id
)
print(response2) # Should remember TechCorp
# Search conversation history
search_results = await memory.search(
"TechCorp",
session_id=session_id
)
# Get conversation statistics
history = await memory.load(session_id)
print(f"Total messages: {len(history)}")
# Export conversation
export_data = {
"session_id": session_id,
"messages": [msg.to_dict() for msg in history],
"exported_at": datetime.now().isoformat()
}
# Clear old sessions
await memory.clear_old_sessions(
older_than=datetime.now() - timedelta(days=30)
)
# Example 5: Memory Middleware
class MemoryMiddleware:
"""Add custom behavior to memory operations"""
def __init__(self, memory: MemoryBackend):
self.memory = memory
async def before_save(self, session_id: str, messages: List[Message]):
# Sanitize PII
for msg in messages:
msg.content = self.sanitize_pii(msg.content)
async def after_load(self, session_id: str, messages: List[Message]):
# Decrypt if needed
for msg in messages:
if msg.metadata.get("encrypted"):
msg.content = self.decrypt(msg.content)
def sanitize_pii(self, text: str) -> str:
# Remove personal information
import re
# Remove email addresses
text = re.sub(r'\S+@\S+', '[EMAIL]', text)
# Remove phone numbers
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
return text
if __name__ == "__main__":
asyncio.run(main())Memory Backend Comparison
| Backend | Persistence | Scalability | Performance | Use Case |
|---|---|---|---|---|
| InMemory | None | Single instance | Fastest | Development/Testing |
| Redis | TTL-based | Distributed | Very fast | Production |
| SQLite | File-based | Single machine | Fast | Local apps |
| PostgreSQL | Full ACID | Highly scalable | Good | Enterprise |
Best Practices
Do's
- • Set appropriate TTL values
- • Implement cleanup strategies
- • Use compression for large conversations
- • Handle connection failures gracefully
- • Monitor memory usage
- • Backup critical conversations
Don'ts
- • Don't store sensitive data unencrypted
- • Don't ignore memory limits
- • Don't skip error handling
- • Don't use in-memory for production
- • Don't forget about GDPR compliance
- • Don't mix session data
Performance Tips
- • Use connection pooling for database backends
- • Enable compression for Redis with large messages
- • Index session_id columns in SQL databases
- • Implement message pagination for long conversations
- • Use batch operations when possible
Documentation Complete!
You've explored the complete AA Kit API reference. Ready to build amazing AI agents?