Isolating AI Agent Memory: A Multi-Instance Architecture
Learn how to isolate AI agent memory with per-instance Qdrant collections, Docker networking, and OpenClaw configuration for clean separation between multiple agents.
Table of Contents
- The Problem with Shared Memory
- Three Levels of Isolation
- Level 1: Container Isolation (Strongest)
- Level 2: Collection Isolation (Database-Level)
- Level 3: Payload Partitioning (Scalable)
- Tiered Isolation: The Best of Both Worlds
- Configuration Management
- The Curator Problem: Scheduling Without Conflicts
- Solution 1: Staggered Scheduling
- Solution 2: Distributed Locking
- Solution 3: Resource-Aware Scheduling
- Lessons Learned
- 1. Abstraction is Your Friend
- 2. Test Your Boundaries
- 3. Monitor Collection Growth
- 4. Plan Your Escape Hatches
- Choosing Your Architecture
- Final Architecture
You’ve built your AI agent. It remembers your preferences, learns from conversations, and stores knowledge in a vector database. Then you add a second agent. And a third. Suddenly, your research agent is pulling memories from your coding agent. Your personal assistant knows about your work projects. Your agents are hallucinating facts they never learned.
The problem? Shared memory. The solution? Per-agent memory isolation.
The Problem with Shared Memory
When multiple AI agents share a single memory space, you get context pollution. Agent Alpha stores “User prefers dark mode” while Agent Beta stores “User prefers light mode.” Now both agents are confused.
Time: 14:30
Alpha: "Remembering: User prefers dark mode"
Beta: "Remembering: User prefers light mode"
Time: 14:35
Alpha: searches memory → finds BOTH preferences
Alpha: "Based on your preferences... which one?"
User: "???"
This isn’t just annoying. It’s a real problem for:
- Multi-tenant SaaS: Customer A’s data leaking to Customer B
- Enterprise deployments: HR agents seeing Engineering memories
- Privacy compliance: GDPR, HIPAA require data isolation
- Agent specialization: A research agent shouldn’t have coding agent context
Three Levels of Isolation
You can isolate agent memory at three levels, each with tradeoffs:
Level 1: Container Isolation (Strongest)
Run each agent in its own Docker container with completely separate memory stores.
# docker-compose.yml
services:
agent-alpha:
build: ./agent
ports:
- "8001:8000"
environment:
- AGENT_NAME=alpha
- QDRANT_COLLECTION=agent_alpha_memory
volumes:
- ./configs/alpha.yaml:/app/config.yaml
- agent_alpha_data:/data
deploy:
resources:
limits:
memory: 2G
cpus: '1.0'
agent-beta:
build: ./agent
ports:
- "8002:8000"
environment:
- AGENT_NAME=beta
- QDRANT_COLLECTION=agent_beta_memory
volumes:
- ./configs/beta.yaml:/app/config.yaml
- agent_beta_data:/data
deploy:
resources:
limits:
memory: 2G
cpus: '1.0'
volumes:
agent_alpha_data:
agent_beta_data:
Each container gets its own:
- Process namespace (PID isolation)
- Network namespace (separate network stack)
- Filesystem (isolated
/data) - Resource limits (CPU, memory quotas)
When to use: Small number of agents (under 10), strict isolation requirements, different embedding models per agent.
Level 2: Collection Isolation (Database-Level)
Use one Qdrant instance but separate collections per agent.
from qdrant_client import QdrantClient, models
client = QdrantClient(url="http://192.168.1.100:6333")
# Create dedicated collection for Agent Alpha
client.create_collection(
collection_name="agent_alpha_memory",
vectors_config=models.VectorParams(
size=768,
distance=models.Distance.COSINE,
),
)
# Create dedicated collection for Agent Beta
client.create_collection(
collection_name="agent_beta_memory",
vectors_config=models.VectorParams(
size=768,
distance=models.Distance.COSINE,
),
)
Each agent queries only its own collection:
# Agent Alpha's memory store
def store_alpha_memory(content: str, embedding: list[float]):
client.upsert(
collection_name="agent_alpha_memory",
points=[models.PointStruct(
id=hash(content),
vector=embedding,
payload={"content": content, "timestamp": datetime.now().isoformat()}
)]
)
# Agent Alpha's memory search
def search_alpha(query: str, limit: int = 5):
query_vector = get_embedding(query)
return client.query_points(
collection_name="agent_alpha_memory",
query=query_vector,
limit=limit,
)
When to use: Small-to-medium agent count (under 50), strong isolation, same embedding model.
Level 3: Payload Partitioning (Scalable)
One collection, filtered by agent ID. This scales to thousands of agents.
# Single collection for all agents
client.create_collection(
collection_name="shared_agent_memory",
vectors_config=models.VectorParams(
size=768,
distance=models.Distance.COSINE,
),
hnsw_config=models.HnswConfigDiff(
payload_m=16, # Per-tenant sub-graphs
m=0, # Disable global graph
),
)
# Create tenant index for fast filtering
client.create_payload_index(
collection_name="shared_agent_memory",
field_name="agent_id",
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=True, # Co-locates agent data on disk
),
)
Store memories with agent IDs:
def store_memory(agent_id: str, content: str, embedding: list[float]):
client.upsert(
collection_name="shared_agent_memory",
points=[models.PointStruct(
id=hash(f"{agent_id}_{content}"),
vector=embedding,
payload={
"agent_id": agent_id,
"content": content,
"timestamp": datetime.now().isoformat(),
}
)]
)
# Agent Alpha stores
store_memory("alpha", "User prefers dark mode", alpha_embedding)
# Agent Beta stores (completely isolated)
store_memory("beta", "User's timezone is EST", beta_embedding)
Search with isolation guarantees:
def search_agent_memory(agent_id: str, query: str, limit: int = 5):
results = client.query_points(
collection_name="shared_agent_memory",
query=get_embedding(query),
query_filter=models.Filter(
must=[models.FieldCondition(
key="agent_id",
match=models.MatchValue(value=agent_id),
)]
),
limit=limit,
)
return results.points
# Alpha searches - sees only alpha memories
alpha_results = search_agent_memory("alpha", "user preferences")
# Beta searches - sees only beta memories
beta_results = search_agent_memory("beta", "user settings")
When to use: Many agents (50+), same embedding model, moderate isolation requirements.
Tiered Isolation: The Best of Both Worlds
For real-world deployments, combine approaches. Large, high-value agents get dedicated collections. Smaller agents share a partitioned collection.
# Create collection with custom sharding
client.create_collection(
collection_name="tiered_agent_memory",
vectors_config=models.VectorParams(size=768, distance=models.Distance.COSINE),
sharding_method=models.ShardingMethod.CUSTOM,
)
# Dedicated shards for important agents
client.create_shard_key("tiered_agent_memory", shard_key="alpha")
client.create_shard_key("tiered_agent_memory", shard_key="beta")
# Shared shard for smaller agents
client.create_shard_key("tiered_agent_memory", shard_key="default")
When a shared agent grows large enough:
def promote_agent_to_dedicated(agent_id: str, source_shard: str):
"""Upgrade agent from shared to dedicated shard."""
# 1. Create dedicated shard
client.create_shard_key("tiered_agent_memory", shard_key=agent_id)
# 2. Copy data to new shard
all_points, offset = [], None
while True:
result = client.scroll(
collection_name="tiered_agent_memory",
scroll_filter=models.Filter(
must=[models.FieldCondition(
key="agent_id",
match=models.MatchValue(value=agent_id)
)]
),
limit=100,
offset=offset,
with_payload=True,
with_vectors=True,
shard_key_selector=source_shard,
)
points, offset = result
all_points.extend(points)
if offset is None:
break
if all_points:
# Insert to new shard FIRST (safety)
client.upsert(
collection_name="tiered_agent_memory",
points=[models.PointStruct(
id=p.id, vector=p.vector, payload=p.payload
) for p in all_points],
shard_key_selector=agent_id,
)
# Then delete from old shard
client.delete(
collection_name="tiered_agent_memory",
points_selector=models.PointIdsList(
points=[p.id for p in all_points]
),
shard_key_selector=source_shard,
)
Configuration Management
Each agent needs its own configuration file pointing to its memory space.
# configs/alpha.yaml
agent:
name: alpha
version: "1.0.0"
memory:
qdrant:
url: "http://192.168.1.100:6333"
collection: "agent_alpha_memory"
redis:
url: "redis://192.168.1.100:6379"
prefix: "alpha:" # Key isolation
model:
provider: ollama
name: llama3
url: "http://192.168.1.105:11434"
scheduler:
curator_interval: 3600 # Run consolidation hourly
curator_offset: 0 # Start at minute 0
# configs/beta.yaml
agent:
name: beta
version: "1.0.0"
memory:
qdrant:
url: "http://192.168.1.100:6333"
collection: "agent_beta_memory"
redis:
url: "redis://192.168.1.100:6379"
prefix: "beta:"
model:
provider: ollama
name: mistral
url: "http://192.168.1.105:11434"
scheduler:
curator_interval: 3600
curator_offset: 1200 # Start 20 minutes after alpha
Load configurations with environment variable overrides:
import os
import yaml
class AgentConfig:
def __init__(self, config_path: str = None):
config_path = config_path or os.getenv("AGENT_CONFIG_PATH", "configs/default.yaml")
with open(config_path) as f:
self.config = yaml.safe_load(f)
# Environment overrides
self.agent_name = os.getenv("AGENT_NAME", self.config["agent"]["name"])
self.qdrant_url = os.getenv("QDRANT_URL", self.config["memory"]["qdrant"]["url"])
self.collection = os.getenv("QDRANT_COLLECTION",
self.config["memory"]["qdrant"]["collection"])
The Curator Problem: Scheduling Without Conflicts
Memory consolidation (curator tasks) needs to run periodically. But if all agents run their curators simultaneously, you hammer shared resources.
Time: 00:00 ───────────────────────────────────────── 01:00
│
Alpha curator: ████████████████████████ (consolidating...)
Beta curator: ████████████████████████ (consolidating...)
Gamma curator: ████████████████████████ (consolidating...)
│
▼
CPU/Memory Spike!
Qdrant overloaded
Solution 1: Staggered Scheduling
Each agent offsets its curator by a fixed amount:
from datetime import datetime
import asyncio
class StaggeredScheduler:
def __init__(self, interval: int, offset: int):
self.interval = interval
self.offset = offset
async def run_periodic(self, task):
while True:
now = datetime.now()
seconds_since_hour = now.minute * 60 + now.second
if self.offset >= seconds_since_hour:
sleep_time = self.offset - seconds_since_hour
else:
sleep_time = self.interval - (seconds_since_hour - self.offset)
await asyncio.sleep(sleep_time)
await task()
await asyncio.sleep(self.interval)
Alpha runs at XX:00, Beta at XX:20, Gamma at XX:40. No overlap.
Solution 2: Distributed Locking
Use Redis to ensure only one curator runs at a time:
import redis
from contextlib import contextmanager
class DistributedLock:
def __init__(self, redis_url: str, lock_name: str, timeout: int = 300):
self.redis = redis.from_url(redis_url)
self.lock_name = lock_name
self.timeout = timeout
@contextmanager
def acquire(self):
lock = self.redis.lock(self.lock_name, timeout=self.timeout)
acquired = lock.acquire(blocking=True, timeout=60)
try:
if acquired:
yield True
else:
yield False
finally:
if acquired:
lock.release()
async def run_curator(agent_id: str):
lock = DistributedLock("redis://192.168.1.100:6379", "curator:global_lock")
with lock.acquire() as acquired:
if acquired:
await consolidate_memories(agent_id)
else:
await asyncio.sleep(60) # Retry in 1 minute
Solution 3: Resource-Aware Scheduling
Check system load before running:
import psutil
class ResourceAwareScheduler:
def __init__(self, thresholds: dict):
self.thresholds = thresholds
def can_run(self) -> bool:
cpu = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory().percent
return (
cpu < self.thresholds.get("cpu_max", 80) and
memory < self.thresholds.get("memory_max", 90)
)
async def run_with_backoff(self, task):
backoff = 60
while True:
if self.can_run():
await task()
return
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 3600)
Lessons Learned
1. Abstraction is Your Friend
Never write raw Qdrant queries scattered throughout your codebase. Wrap everything:
class AgentMemory:
def __init__(self, agent_id: str, collection: str):
self.agent_id = agent_id
self.collection = collection
self.client = QdrantClient(url=settings.qdrant_url)
def store(self, content: str, metadata: dict = None):
# Always include agent_id filter
...
def search(self, query: str, limit: int = 5):
# Always filter by agent_id
...
This prevents a developer from accidentally querying without a filter and leaking cross-agent data.
2. Test Your Boundaries
Write tests that verify isolation:
def test_memory_isolation():
# Alpha stores
alpha_memory = AgentMemory("alpha", "shared_collection")
alpha_memory.store("Alpha's secret", embedding)
# Beta should NOT see Alpha's memory
beta_memory = AgentMemory("beta", "shared_collection")
results = beta_memory.search("Alpha's secret")
assert len(results) == 0, "Memory leak! Beta saw Alpha's data"
3. Monitor Collection Growth
Set alerts when memory grows beyond expected bounds. A misconfigured agent might be writing to the wrong collection.
def check_collection_size():
info = client.get_collection(collection_name)
if info.points_count > EXPECTED_MAX:
alert(f"Collection {collection} has {info.points_count} points, expected < {EXPECTED_MAX}")
4. Plan Your Escape Hatches
Sometimes you DO want to share memories. Build it in from the start:
def search_with_override(agent_id: str, query: str, include_agents: list[str] = None):
"""Search with optional cross-agent access."""
if include_agents:
# Explicit opt-in for shared search
agent_filter = models.FieldCondition(
key="agent_id",
match=models.MatchAny(any=include_agents + [agent_id])
)
else:
# Default: own memories only
agent_filter = models.FieldCondition(
key="agent_id",
match=models.MatchValue(value=agent_id)
)
return client.query_points(
collection_name="shared_agent_memory",
query=get_embedding(query),
query_filter=models.Filter(must=[agent_filter]),
limit=5,
)
Choosing Your Architecture
| Scenario | Recommended Approach |
|---|---|
| Under 10 agents, strict isolation | Docker containers + dedicated collections |
| 10-100 agents, same model | Single collection with payload filtering |
| Mix of small and large | Tiered: dedicated shards for large, shared for small |
| Compliance (GDPR, HIPAA) | Containers + dedicated collections + encryption |
| Rapid prototyping | Single collection, filter by agent_id |
Final Architecture
Here’s a production-ready setup for a 5-agent system:
# docker-compose.yml
services:
agent-alpha:
build: ./agent
ports: ["8001:8000"]
environment:
- AGENT_NAME=alpha
- QDRANT_COLLECTION=agent_alpha_memory
- CURATOR_INTERVAL=3600
- CURATOR_OFFSET=0
volumes:
- ./configs/alpha.yaml:/app/config.yaml
agent-beta:
build: ./agent
ports: ["8002:8000"]
environment:
- AGENT_NAME=beta
- QDRANT_COLLECTION=agent_beta_memory
- CURATOR_INTERVAL=3600
- CURATOR_OFFSET=1200
agent-gamma:
build: ./agent
ports: ["8003:8000"]
environment:
- AGENT_NAME=gamma
- QDRANT_COLLECTION=shared_collection
- QDRANT_TENANT_ID=gamma
- CURATOR_INTERVAL=3600
- CURATOR_OFFSET=2400
# Shared infrastructure
qdrant:
image: qdrant/qdrant:latest
ports: ["6333:6333"]
volumes:
- qdrant_data:/qdrant/storage
redis:
image: redis:7-alpine
ports: ["6379:6379"]
volumes:
qdrant_data:
Alpha and Beta get dedicated collections for strong isolation. Gamma uses a shared collection with tenant filtering. All curator jobs are offset by 20 minutes each.
Memory isolation isn’t just good architecture. It’s how you build trustworthy multi-agent systems that scale without leaking context between agents. Start simple (single collection, payload filtering), then graduate to dedicated collections or containers as your isolation requirements grow.
Comments
Powered by GitHub Discussions