Master building AI agents capable of autonomous task execution, decision-making, and workflow automation with practical implementations and patterns.
AI agents represent the next frontier in artificial intelligence applications, moving beyond simple question-answering to autonomous task execution and decision-making. These systems can break down complex objectives into manageable steps, use tools dynamically, maintain context across interactions, and coordinate with other agents to achieve sophisticated goals. This comprehensive guide will teach you to build production-ready AI agent systems.
Understanding AI Agent Architecture
AI agents are autonomous systems that can perceive their environment, make decisions, and take actions to achieve specific goals. Unlike traditional rule-based automation, AI agents can adapt to new situations, handle uncertainty, and learn from experience.
Core Agent Components
# agent_architecture.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable, Union
from enum import Enum
import asyncio
import json
from datetime import datetime
import uuid
class AgentState(Enum):
IDLE = "idle"
PLANNING = "planning"
EXECUTING = "executing"
WAITING = "waiting"
ERROR = "error"
COMPLETED = "completed"
@dataclass
class Task:
id: str
description: str
priority: int = 1
dependencies: List[str] = field(default_factory=list)
parameters: Dict[str, Any] = field(default_factory=dict)
status: str = "pending"
assigned_agent: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
completed_at: Optional[datetime] = None
result: Optional[Any] = None
@dataclass
class AgentAction:
action_type: str
parameters: Dict[str, Any]
tool_name: Optional[str] = None
expected_outcome: Optional[str] = None
@dataclass
class AgentObservation:
observation_type: str
content: Any
timestamp: datetime = field(default_factory=datetime.now)
source: Optional[str] = None
class Memory(ABC):
"""Abstract memory interface for agents"""
@abstractmethod
async def store(self, key: str, value: Any, ttl: Optional[int] = None):
"""Store information in memory"""
pass
@abstractmethod
async def retrieve(self, key: str) -> Optional[Any]:
"""Retrieve information from memory"""
pass
@abstractmethod
async def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search memory for relevant information"""
pass
class Tool(ABC):
"""Abstract tool interface"""
@property
@abstractmethod
def name(self) -> str:
pass
@property
@abstractmethod
def description(self) -> str:
pass
@property
@abstractmethod
def parameters_schema(self) -> Dict[str, Any]:
pass
@abstractmethod
async def execute(self, **kwargs) -> Dict[str, Any]:
"""Execute the tool with given parameters"""
pass
class Planner(ABC):
"""Abstract planner interface"""
@abstractmethod
async def create_plan(self,
goal: str,
context: Dict[str, Any],
available_tools: List[Tool]) -> List[AgentAction]:
"""Create a plan to achieve the given goal"""
pass
@abstractmethod
async def replan(self,
current_plan: List[AgentAction],
execution_results: List[Dict[str, Any]],
new_context: Dict[str, Any]) -> List[AgentAction]:
"""Replan based on execution results"""
pass
class BaseAgent(ABC):
"""Base agent class with core functionality"""
def __init__(self,
agent_id: str,
name: str,
memory: Memory,
planner: Planner,
tools: List[Tool] = None,
max_iterations: int = 10):
self.agent_id = agent_id
self.name = name
self.memory = memory
self.planner = planner
self.tools = {tool.name: tool for tool in (tools or [])}
self.max_iterations = max_iterations
# Agent state
self.state = AgentState.IDLE
self.current_task: Optional[Task] = None
self.current_plan: List[AgentAction] = []
self.execution_history: List[Dict[str, Any]] = []
# Callbacks
self.state_change_callbacks: List[Callable] = []
self.task_completion_callbacks: List[Callable] = []
async def execute_task(self, task: Task) -> Dict[str, Any]:
"""Execute a task autonomously"""
self.current_task = task
self._change_state(AgentState.PLANNING)
try:
# Create initial plan
context = await self._gather_context(task)
self.current_plan = await self.planner.create_plan(
task.description, context, list(self.tools.values())
)
# Execute plan with iterations
result = await self._execute_plan_with_feedback()
# Mark task as completed
task.status = "completed"
task.completed_at = datetime.now()
task.result = result
self._change_state(AgentState.COMPLETED)
# Notify callbacks
for callback in self.task_completion_callbacks:
await callback(self, task, result)
return {
"success": True,
"result": result,
"iterations": len(self.execution_history),
"execution_history": self.execution_history
}
except Exception as e:
self._change_state(AgentState.ERROR)
task.status = "error"
return {
"success": False,
"error": str(e),
"execution_history": self.execution_history
}
finally:
self.current_task = None
self.current_plan = []
async def _execute_plan_with_feedback(self) -> Any:
"""Execute plan with feedback and replanning"""
self._change_state(AgentState.EXECUTING)
for iteration in range(self.max_iterations):
if not self.current_plan:
break
# Execute next action
action = self.current_plan.pop(0)
execution_result = await self._execute_action(action)
# Record execution
self.execution_history.append({
"iteration": iteration,
"action": action,
"result": execution_result,
"timestamp": datetime.now()
})
# Check if task is complete
if await self._is_task_complete(execution_result):
return execution_result.get("result")
# Check if replanning is needed
if await self._should_replan(execution_result):
context = await self._gather_context(self.current_task)
context["execution_history"] = self.execution_history
self.current_plan = await self.planner.replan(
self.current_plan,
self.execution_history,
context
)
# If we've exhausted iterations, return the last result
if self.execution_history:
return self.execution_history[-1]["result"].get("result")
raise RuntimeError("Failed to complete task within maximum iterations")
async def _execute_action(self, action: AgentAction) -> Dict[str, Any]:
"""Execute a single action"""
try:
if action.tool_name and action.tool_name in self.tools:
# Execute tool
tool = self.tools[action.tool_name]
result = await tool.execute(**action.parameters)
return {
"success": True,
"action_type": action.action_type,
"tool_used": action.tool_name,
"result": result
}
else:
# Handle non-tool actions (thinking, planning, etc.)
result = await self._handle_internal_action(action)
return {
"success": True,
"action_type": action.action_type,
"result": result
}
except Exception as e:
return {
"success": False,
"action_type": action.action_type,
"error": str(e)
}
async def _handle_internal_action(self, action: AgentAction) -> Any:
"""Handle internal agent actions (thinking, memory operations, etc.)"""
if action.action_type == "think":
# Store thought in memory
thought = action.parameters.get("thought", "")
await self.memory.store(f"thought_{datetime.now().isoformat()}", thought)
return {"thought": thought}
elif action.action_type == "remember":
# Retrieve from memory
query = action.parameters.get("query", "")
memories = await self.memory.search(query)
return {"memories": memories}
elif action.action_type == "wait":
# Wait for specified duration
duration = action.parameters.get("duration", 1)
await asyncio.sleep(duration)
return {"waited": duration}
else:
return {"message": f"Unknown internal action: {action.action_type}"}
async def _gather_context(self, task: Task) -> Dict[str, Any]:
"""Gather context for planning"""
context = {
"task": task,
"available_tools": list(self.tools.keys()),
"agent_capabilities": await self._get_capabilities(),
"relevant_memories": await self.memory.search(task.description, limit=5)
}
return context
async def _get_capabilities(self) -> List[str]:
"""Get agent capabilities"""
capabilities = ["planning", "execution", "memory", "learning"]
capabilities.extend(list(self.tools.keys()))
return capabilities
async def _is_task_complete(self, execution_result: Dict[str, Any]) -> bool:
"""Check if the task is complete"""
# Simple completion check based on result
if execution_result.get("success") and "final_result" in execution_result.get("result", {}):
return True
# Check if task-specific completion criteria are met
if self.current_task and hasattr(self, '_check_task_completion'):
return await self._check_task_completion(self.current_task, execution_result)
return False
async def _should_replan(self, execution_result: Dict[str, Any]) -> bool:
"""Determine if replanning is needed"""
# Replan if action failed
if not execution_result.get("success"):
return True
# Replan if unexpected result
if "unexpected" in execution_result.get("result", {}):
return True
# Replan if explicitly requested
if execution_result.get("replan_requested"):
return True
return False
def _change_state(self, new_state: AgentState):
"""Change agent state and notify callbacks"""
old_state = self.state
self.state = new_state
for callback in self.state_change_callbacks:
asyncio.create_task(callback(self, old_state, new_state))
def add_state_change_callback(self, callback: Callable):
"""Add callback for state changes"""
self.state_change_callbacks.append(callback)
def add_task_completion_callback(self, callback: Callable):
"""Add callback for task completion"""
self.task_completion_callbacks.append(callback)
def add_tool(self, tool: Tool):
"""Add a tool to the agent"""
self.tools[tool.name] = tool
def remove_tool(self, tool_name: str):
"""Remove a tool from the agent"""
if tool_name in self.tools:
del self.tools[tool_name]
Memory Systems for Agents
# agent_memory.py
import asyncio
import json
import sqlite3
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import numpy as np
from collections import defaultdict, deque
import hashlib
class VectorMemory(Memory):
"""Vector-based semantic memory for agents"""
def __init__(self,
embedding_model,
db_path: str = "agent_memory.db",
max_memories: int = 10000):
self.embedding_model = embedding_model
self.db_path = db_path
self.max_memories = max_memories
# Initialize database
self._init_database()
# In-memory cache for recent memories
self.memory_cache = {}
self.cache_size = 1000
def _init_database(self):
"""Initialize SQLite database for memory storage"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
key TEXT,
content TEXT,
embedding BLOB,
metadata TEXT,
created_at TIMESTAMP,
accessed_at TIMESTAMP,
access_count INTEGER DEFAULT 1,
ttl TIMESTAMP
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_key ON memories(key)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_created_at ON memories(created_at)
""")
async def store(self, key: str, value: Any, ttl: Optional[int] = None):
"""Store information with semantic embedding"""
# Create memory ID
memory_id = hashlib.md5(f"{key}_{datetime.now().isoformat()}".encode()).hexdigest()
# Convert value to string for embedding
if isinstance(value, dict):
content = json.dumps(value)
else:
content = str(value)
# Generate embedding
embedding = await self.embedding_model.embed_text(content)
# Calculate TTL
ttl_timestamp = None
if ttl:
ttl_timestamp = datetime.now() + timedelta(seconds=ttl)
# Store in database
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO memories
(id, key, content, embedding, metadata, created_at, accessed_at, ttl)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
memory_id,
key,
content,
embedding.tobytes(),
json.dumps({"type": type(value).__name__}),
datetime.now(),
datetime.now(),
ttl_timestamp
))
# Update cache
self.memory_cache[key] = {
"content": value,
"embedding": embedding,
"created_at": datetime.now()
}
# Cleanup old memories if needed
await self._cleanup_memories()
async def retrieve(self, key: str) -> Optional[Any]:
"""Retrieve information by exact key"""
# Check cache first
if key in self.memory_cache:
cached = self.memory_cache[key]
return cached["content"]
# Query database
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT content, metadata, access_count
FROM memories
WHERE key = ? AND (ttl IS NULL OR ttl > ?)
ORDER BY created_at DESC
LIMIT 1
""", (key, datetime.now()))
row = cursor.fetchone()
if row:
content, metadata, access_count = row
# Update access count
conn.execute("""
UPDATE memories
SET accessed_at = ?, access_count = access_count + 1
WHERE key = ?
""", (datetime.now(), key))
# Try to deserialize based on metadata
try:
meta = json.loads(metadata)
if meta.get("type") == "dict":
return json.loads(content)
else:
return content
except:
return content
return None
async def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search for semantically similar memories"""
# Generate query embedding
query_embedding = await self.embedding_model.embed_text(query)
# Get all memories from database
memories = []
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT id, key, content, embedding, created_at, access_count
FROM memories
WHERE ttl IS NULL OR ttl > ?
""", (datetime.now(),))
for row in cursor.fetchall():
memory_id, key, content, embedding_bytes, created_at, access_count = row
# Convert embedding back to numpy array
memory_embedding = np.frombuffer(embedding_bytes, dtype=np.float32)
# Calculate similarity
similarity = self._cosine_similarity(query_embedding, memory_embedding)
memories.append({
"id": memory_id,
"key": key,
"content": content,
"similarity": similarity,
"created_at": created_at,
"access_count": access_count
})
# Sort by similarity and return top results
memories.sort(key=lambda x: x["similarity"], reverse=True)
return memories[:limit]
def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
"""Calculate cosine similarity between two vectors"""
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
async def _cleanup_memories(self):
"""Cleanup old memories to maintain size limits"""
with sqlite3.connect(self.db_path) as conn:
# Remove expired memories
conn.execute("DELETE FROM memories WHERE ttl IS NOT NULL AND ttl <= ?", (datetime.now(),))
# Check total count
cursor = conn.execute("SELECT COUNT(*) FROM memories")
total_count = cursor.fetchone()[0]
if total_count > self.max_memories:
# Remove oldest, least accessed memories
excess = total_count - self.max_memories
conn.execute("""
DELETE FROM memories
WHERE id IN (
SELECT id FROM memories
ORDER BY access_count ASC, accessed_at ASC
LIMIT ?
)
""", (excess,))
class EpisodicMemory(Memory):
"""Episodic memory for storing agent experiences"""
def __init__(self, max_episodes: int = 1000):
self.episodes = deque(maxlen=max_episodes)
self.episode_index = {} # For fast lookups
async def store(self, key: str, value: Any, ttl: Optional[int] = None):
"""Store an episode"""
episode = {
"id": key,
"content": value,
"timestamp": datetime.now(),
"ttl": datetime.now() + timedelta(seconds=ttl) if ttl else None
}
self.episodes.append(episode)
self.episode_index[key] = episode
async def retrieve(self, key: str) -> Optional[Any]:
"""Retrieve episode by ID"""
episode = self.episode_index.get(key)
if episode and (not episode["ttl"] or episode["ttl"] > datetime.now()):
return episode["content"]
return None
async def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search episodes by content similarity"""
results = []
query_lower = query.lower()
for episode in self.episodes:
if episode["ttl"] and episode["ttl"] <= datetime.now():
continue
content_str = str(episode["content"]).lower()
if query_lower in content_str:
results.append({
"id": episode["id"],
"content": episode["content"],
"timestamp": episode["timestamp"],
"relevance": content_str.count(query_lower) / len(content_str.split())
})
# Sort by relevance and timestamp
results.sort(key=lambda x: (x["relevance"], x["timestamp"]), reverse=True)
return results[:limit]
def get_recent_episodes(self, hours: int = 24) -> List[Dict[str, Any]]:
"""Get recent episodes within specified hours"""
cutoff = datetime.now() - timedelta(hours=hours)
recent = []
for episode in self.episodes:
if episode["timestamp"] >= cutoff:
if not episode["ttl"] or episode["ttl"] > datetime.now():
recent.append(episode)
return recent
class WorkingMemory(Memory):
"""Working memory for current task context"""
def __init__(self, capacity: int = 7): # Miller's magic number
self.capacity = capacity
self.items = {}
self.access_order = deque()
async def store(self, key: str, value: Any, ttl: Optional[int] = None):
"""Store item in working memory"""
# Remove if already exists
if key in self.items:
self.access_order.remove(key)
# Add new item
self.items[key] = {
"value": value,
"timestamp": datetime.now(),
"ttl": datetime.now() + timedelta(seconds=ttl) if ttl else None
}
self.access_order.append(key)
# Evict oldest if over capacity
while len(self.items) > self.capacity:
oldest_key = self.access_order.popleft()
if oldest_key in self.items:
del self.items[oldest_key]
async def retrieve(self, key: str) -> Optional[Any]:
"""Retrieve item and update access order"""
if key not in self.items:
return None
item = self.items[key]
# Check TTL
if item["ttl"] and item["ttl"] <= datetime.now():
del self.items[key]
self.access_order.remove(key)
return None
# Update access order
self.access_order.remove(key)
self.access_order.append(key)
return item["value"]
async def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search working memory items"""
results = []
query_lower = query.lower()
for key, item in self.items.items():
if item["ttl"] and item["ttl"] <= datetime.now():
continue
content_str = str(item["value"]).lower()
if query_lower in content_str:
results.append({
"key": key,
"content": item["value"],
"timestamp": item["timestamp"]
})
return results[:limit]
def get_all_items(self) -> Dict[str, Any]:
"""Get all items in working memory"""
current_time = datetime.now()
valid_items = {}
for key, item in self.items.items():
if not item["ttl"] or item["ttl"] > current_time:
valid_items[key] = item["value"]
return valid_items
class HybridMemory(Memory):
"""Hybrid memory system combining multiple memory types"""
def __init__(self,
embedding_model,
vector_memory_config: Dict[str, Any] = None,
episodic_memory_config: Dict[str, Any] = None,
working_memory_config: Dict[str, Any] = None):
# Initialize memory components
self.vector_memory = VectorMemory(
embedding_model,
**(vector_memory_config or {})
)
self.episodic_memory = EpisodicMemory(
**(episodic_memory_config or {})
)
self.working_memory = WorkingMemory(
**(working_memory_config or {})
)
async def store(self, key: str, value: Any, ttl: Optional[int] = None):
"""Store in appropriate memory systems"""
# Always store in working memory for immediate access
await self.working_memory.store(key, value, ttl)
# Store in vector memory for semantic search
await self.vector_memory.store(key, value, ttl)
# Store episodes in episodic memory
if isinstance(value, dict) and "episode" in str(value).lower():
await self.episodic_memory.store(key, value, ttl)
async def retrieve(self, key: str) -> Optional[Any]:
"""Retrieve from memory systems in order of speed"""
# Check working memory first
result = await self.working_memory.retrieve(key)
if result is not None:
return result
# Check vector memory
result = await self.vector_memory.retrieve(key)
if result is not None:
# Promote to working memory
await self.working_memory.store(key, result)
return result
# Check episodic memory
result = await self.episodic_memory.retrieve(key)
if result is not None:
# Promote to working memory
await self.working_memory.store(key, result)
return result
return None
async def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search across all memory systems"""
# Search all memory systems
working_results = await self.working_memory.search(query, limit)
vector_results = await self.vector_memory.search(query, limit)
episodic_results = await self.episodic_memory.search(query, limit)
# Combine and deduplicate results
all_results = []
seen_keys = set()
# Prioritize working memory results
for result in working_results:
key = result.get("key", result.get("id", ""))
if key not in seen_keys:
result["source"] = "working"
all_results.append(result)
seen_keys.add(key)
# Add vector memory results
for result in vector_results:
key = result.get("key", result.get("id", ""))
if key not in seen_keys:
result["source"] = "vector"
all_results.append(result)
seen_keys.add(key)
# Add episodic memory results
for result in episodic_results:
key = result.get("key", result.get("id", ""))
if key not in seen_keys:
result["source"] = "episodic"
all_results.append(result)
seen_keys.add(key)
return all_results[:limit]
Advanced Tool System
# agent_tools.py
import aiohttp
import asyncio
import subprocess
import json
import os
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
import tempfile
from pathlib import Path
class WebSearchTool(Tool):
"""Tool for web search capabilities"""
def __init__(self, api_key: str, search_engine: str = "serper"):
self.api_key = api_key
self.search_engine = search_engine
@property
def name(self) -> str:
return "web_search"
@property
def description(self) -> str:
return "Search the web for current information on any topic"
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"num_results": {
"type": "integer",
"description": "Number of results to return",
"default": 5
}
},
"required": ["query"]
}
async def execute(self, **kwargs) -> Dict[str, Any]:
"""Execute web search"""
query = kwargs.get("query")
num_results = kwargs.get("num_results", 5)
if self.search_engine == "serper":
return await self._search_with_serper(query, num_results)
else:
raise ValueError(f"Unsupported search engine: {self.search_engine}")
async def _search_with_serper(self, query: str, num_results: int) -> Dict[str, Any]:
"""Search using Serper API"""
url = "https://google.serper.dev/search"
headers = {
"X-API-KEY": self.api_key,
"Content-Type": "application/json"
}
payload = {
"q": query,
"num": num_results
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
if response.status == 200:
data = await response.json()
results = []
for item in data.get("organic", []):
results.append({
"title": item.get("title", ""),
"snippet": item.get("snippet", ""),
"link": item.get("link", ""),
"position": item.get("position", 0)
})
return {
"success": True,
"results": results,
"query": query,
"total_results": len(results)
}
else:
return {
"success": False,
"error": f"Search failed with status {response.status}"
}
class CodeExecutionTool(Tool):
"""Tool for executing code safely"""
def __init__(self, allowed_languages: List[str] = None, timeout: int = 30):
self.allowed_languages = allowed_languages or ["python", "javascript", "bash"]
self.timeout = timeout
@property
def name(self) -> str:
return "code_execution"
@property
def description(self) -> str:
return "Execute code safely in a controlled environment"
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"language": {
"type": "string",
"enum": self.allowed_languages,
"description": "Programming language"
},
"code": {
"type": "string",
"description": "Code to execute"
},
"stdin": {
"type": "string",
"description": "Standard input for the program",
"default": ""
}
},
"required": ["language", "code"]
}
async def execute(self, **kwargs) -> Dict[str, Any]:
"""Execute code with safety measures"""
language = kwargs.get("language")
code = kwargs.get("code")
stdin = kwargs.get("stdin", "")
if language not in self.allowed_languages:
return {
"success": False,
"error": f"Language {language} not allowed"
}
try:
if language == "python":
return await self._execute_python(code, stdin)
elif language == "javascript":
return await self._execute_javascript(code, stdin)
elif language == "bash":
return await self._execute_bash(code, stdin)
else:
return {
"success": False,
"error": f"Execution not implemented for {language}"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def _execute_python(self, code: str, stdin: str) -> Dict[str, Any]:
"""Execute Python code"""
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
temp_file = f.name
try:
# Execute with timeout
process = await asyncio.create_subprocess_exec(
'python', temp_file,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
process.communicate(input=stdin.encode()),
timeout=self.timeout
)
return {
"success": True,
"stdout": stdout.decode(),
"stderr": stderr.decode(),
"exit_code": process.returncode
}
except asyncio.TimeoutError:
return {
"success": False,
"error": "Code execution timed out"
}
finally:
# Cleanup
os.unlink(temp_file)
async def _execute_javascript(self, code: str, stdin: str) -> Dict[str, Any]:
"""Execute JavaScript code using Node.js"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False) as f:
f.write(code)
temp_file = f.name
try:
process = await asyncio.create_subprocess_exec(
'node', temp_file,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
process.communicate(input=stdin.encode()),
timeout=self.timeout
)
return {
"success": True,
"stdout": stdout.decode(),
"stderr": stderr.decode(),
"exit_code": process.returncode
}
except asyncio.TimeoutError:
return {
"success": False,
"error": "Code execution timed out"
}
finally:
os.unlink(temp_file)
async def _execute_bash(self, code: str, stdin: str) -> Dict[str, Any]:
"""Execute Bash commands (with restrictions)"""
# Security: restrict dangerous commands
dangerous_patterns = ['rm -rf', 'sudo', 'chmod', 'chown', '>', '>>', 'dd', 'mkfs']
for pattern in dangerous_patterns:
if pattern in code:
return {
"success": False,
"error": f"Command contains dangerous pattern: {pattern}"
}
try:
process = await asyncio.create_subprocess_shell(
code,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
process.communicate(input=stdin.encode()),
timeout=self.timeout
)
return {
"success": True,
"stdout": stdout.decode(),
"stderr": stderr.decode(),
"exit_code": process.returncode
}
except asyncio.TimeoutError:
return {
"success": False,
"error": "Command execution timed out"
}
class FileOperationTool(Tool):
"""Tool for safe file operations"""
def __init__(self, allowed_paths: List[str], max_file_size: int = 10 * 1024 * 1024):
self.allowed_paths = [Path(p).resolve() for p in allowed_paths]
self.max_file_size = max_file_size
@property
def name(self) -> str:
return "file_operations"
@property
def description(self) -> str:
return "Perform file operations like read, write, list files"
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": ["read", "write", "list", "exists", "delete"],
"description": "File operation to perform"
},
"path": {
"type": "string",
"description": "File or directory path"
},
"content": {
"type": "string",
"description": "Content to write (for write operation)"
},
"encoding": {
"type": "string",
"default": "utf-8",
"description": "File encoding"
}
},
"required": ["operation", "path"]
}
async def execute(self, **kwargs) -> Dict[str, Any]:
"""Execute file operation"""
operation = kwargs.get("operation")
path = Path(kwargs.get("path")).resolve()
content = kwargs.get("content", "")
encoding = kwargs.get("encoding", "utf-8")
# Security check: ensure path is within allowed directories
if not self._is_path_allowed(path):
return {
"success": False,
"error": f"Path {path} not in allowed directories"
}
try:
if operation == "read":
return await self._read_file(path, encoding)
elif operation == "write":
return await self._write_file(path, content, encoding)
elif operation == "list":
return await self._list_directory(path)
elif operation == "exists":
return {
"success": True,
"exists": path.exists(),
"is_file": path.is_file(),
"is_directory": path.is_dir()
}
elif operation == "delete":
return await self._delete_file(path)
else:
return {
"success": False,
"error": f"Unknown operation: {operation}"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def _is_path_allowed(self, path: Path) -> bool:
"""Check if path is within allowed directories"""
for allowed_path in self.allowed_paths:
try:
path.relative_to(allowed_path)
return True
except ValueError:
continue
return False
async def _read_file(self, path: Path, encoding: str) -> Dict[str, Any]:
"""Read file content"""
if not path.exists():
return {
"success": False,
"error": "File does not exist"
}
if not path.is_file():
return {
"success": False,
"error": "Path is not a file"
}
# Check file size
if path.stat().st_size > self.max_file_size:
return {
"success": False,
"error": f"File too large (max {self.max_file_size} bytes)"
}
try:
with open(path, 'r', encoding=encoding) as f:
content = f.read()
return {
"success": True,
"content": content,
"size": len(content),
"path": str(path)
}
except UnicodeDecodeError:
return {
"success": False,
"error": f"Cannot decode file with encoding {encoding}"
}
async def _write_file(self, path: Path, content: str, encoding: str) -> Dict[str, Any]:
"""Write content to file"""
# Check content size
if len(content.encode(encoding)) > self.max_file_size:
return {
"success": False,
"error": f"Content too large (max {self.max_file_size} bytes)"
}
# Create directory if it doesn't exist
path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(path, 'w', encoding=encoding) as f:
f.write(content)
return {
"success": True,
"bytes_written": len(content.encode(encoding)),
"path": str(path)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def _list_directory(self, path: Path) -> Dict[str, Any]:
"""List directory contents"""
if not path.exists():
return {
"success": False,
"error": "Directory does not exist"
}
if not path.is_dir():
return {
"success": False,
"error": "Path is not a directory"
}
try:
items = []
for item in path.iterdir():
items.append({
"name": item.name,
"path": str(item),
"is_file": item.is_file(),
"is_directory": item.is_dir(),
"size": item.stat().st_size if item.is_file() else None
})
return {
"success": True,
"items": items,
"count": len(items),
"path": str(path)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def _delete_file(self, path: Path) -> Dict[str, Any]:
"""Delete file or directory"""
if not path.exists():
return {
"success": False,
"error": "Path does not exist"
}
try:
if path.is_file():
path.unlink()
elif path.is_dir():
# Only delete if empty
if any(path.iterdir()):
return {
"success": False,
"error": "Directory not empty"
}
path.rmdir()
return {
"success": True,
"deleted": str(path)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
class APICallTool(Tool):
"""Tool for making HTTP API calls"""
def __init__(self, allowed_domains: List[str] = None, timeout: int = 30):
self.allowed_domains = allowed_domains or []
self.timeout = timeout
@property
def name(self) -> str:
return "api_call"
@property
def description(self) -> str:
return "Make HTTP API calls to external services"
@property
def parameters_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"method": {
"type": "string",
"enum": ["GET", "POST", "PUT", "DELETE"],
"description": "HTTP method"
},
"url": {
"type": "string",
"description": "API endpoint URL"
},
"headers": {
"type": "object",
"description": "HTTP headers",
"default": {}
},
"data": {
"type": "object",
"description": "Request data/payload",
"default": {}
},
"params": {
"type": "object",
"description": "URL parameters",
"default": {}
}
},
"required": ["method", "url"]
}
async def execute(self, **kwargs) -> Dict[str, Any]:
"""Execute API call"""
method = kwargs.get("method", "GET")
url = kwargs.get("url")
headers = kwargs.get("headers", {})
data = kwargs.get("data", {})
params = kwargs.get("params", {})
# Security check: validate domain
if self.allowed_domains and not self._is_domain_allowed(url):
return {
"success": False,
"error": "Domain not in allowed list"
}
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
async with session.request(
method=method,
url=url,
headers=headers,
json=data if data else None,
params=params
) as response:
response_data = {
"success": True,
"status_code": response.status,
"headers": dict(response.headers),
"url": str(response.url)
}
# Try to parse JSON, fall back to text
try:
response_data["data"] = await response.json()
except:
response_data["data"] = await response.text()
return response_data
except asyncio.TimeoutError:
return {
"success": False,
"error": "Request timed out"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def _is_domain_allowed(self, url: str) -> bool:
"""Check if domain is allowed"""
from urllib.parse import urlparse
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
for allowed_domain in self.allowed_domains:
if domain == allowed_domain.lower() or domain.endswith(f".{allowed_domain.lower()}"):
return True
return False
LLM-Based Planning System
# llm_planner.py
import json
from typing import List, Dict, Any
from openai import AsyncOpenAI
import asyncio
class LLMPlanner(Planner):
"""LLM-based planner for agent task decomposition"""
def __init__(self,
model: str = "gpt-4",
api_key: str = None,
temperature: float = 0.1):
self.client = AsyncOpenAI(api_key=api_key)
self.model = model
self.temperature = temperature
# Planning prompts
self.planning_prompt = """
You are an AI planning assistant. Your job is to break down complex goals into executable action steps.
Available tools:
{tools_description}
Current context:
{context}
Goal: {goal}
Create a detailed plan to achieve this goal. Return your plan as a JSON array where each step is an object with:
- action_type: Type of action (use_tool, think, wait, etc.)
- description: What this step accomplishes
- tool_name: Tool to use (if action_type is "use_tool")
- parameters: Parameters for the action
- expected_outcome: What you expect to happen
Guidelines:
1. Break complex tasks into simple, executable steps
2. Use available tools when appropriate
3. Include thinking/reasoning steps when needed
4. Be specific about parameters and expected outcomes
5. Consider error handling and alternative approaches
Plan:
"""
self.replanning_prompt = """
You are an AI replanning assistant. The current plan needs to be adjusted based on execution results.
Original goal: {goal}
Current plan: {current_plan}
Execution history: {execution_history}
New context: {context}
Based on the execution results, create an updated plan. Consider:
1. What went wrong and how to fix it
2. What worked well and should continue
3. Any new information or constraints
4. Alternative approaches if the current one isn't working
Return the updated plan as a JSON array with the same format as before.
Updated plan:
"""
async def create_plan(self,
goal: str,
context: Dict[str, Any],
available_tools: List[Tool]) -> List[AgentAction]:
"""Create a plan using LLM reasoning"""
# Prepare tools description
tools_description = self._format_tools_description(available_tools)
# Format context
context_str = json.dumps(context, indent=2, default=str)
# Create planning prompt
prompt = self.planning_prompt.format(
goal=goal,
context=context_str,
tools_description=tools_description
)
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=self.temperature,
max_tokens=2000
)
plan_text = response.choices[0].message.content
# Parse the plan
plan_steps = self._parse_plan(plan_text)
# Convert to AgentAction objects
actions = []
for step in plan_steps:
action = AgentAction(
action_type=step.get("action_type", "think"),
parameters=step.get("parameters", {}),
tool_name=step.get("tool_name"),
expected_outcome=step.get("expected_outcome")
)
actions.append(action)
return actions
except Exception as e:
# Fallback: create simple plan
return [
AgentAction(
action_type="think",
parameters={"thought": f"Failed to create detailed plan: {str(e)}. Proceeding with simple approach."},
expected_outcome="Understanding of the problem"
),
AgentAction(
action_type="use_tool",
tool_name="web_search" if any(tool.name == "web_search" for tool in available_tools) else None,
parameters={"query": goal},
expected_outcome="Information about the goal"
)
]
async def replan(self,
current_plan: List[AgentAction],
execution_results: List[Dict[str, Any]],
new_context: Dict[str, Any]) -> List[AgentAction]:
"""Replan based on execution results"""
# Format current plan
current_plan_dict = [
{
"action_type": action.action_type,
"tool_name": action.tool_name,
"parameters": action.parameters,
"expected_outcome": action.expected_outcome
}
for action in current_plan
]
# Format execution history
execution_summary = []
for i, result in enumerate(execution_results[-5:]): # Last 5 executions
execution_summary.append({
"step": i + 1,
"action": result.get("action"),
"success": result.get("success", False),
"result": result.get("result", {}),
"error": result.get("error")
})
# Create replanning prompt
prompt = self.replanning_prompt.format(
goal=new_context.get("task", {}).get("description", "Unknown goal"),
current_plan=json.dumps(current_plan_dict, indent=2),
execution_history=json.dumps(execution_summary, indent=2),
context=json.dumps(new_context, indent=2, default=str)
)
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=self.temperature,
max_tokens=2000
)
plan_text = response.choices[0].message.content
# Parse the updated plan
plan_steps = self._parse_plan(plan_text)
# Convert to AgentAction objects
actions = []
for step in plan_steps:
action = AgentAction(
action_type=step.get("action_type", "think"),
parameters=step.get("parameters", {}),
tool_name=step.get("tool_name"),
expected_outcome=step.get("expected_outcome")
)
actions.append(action)
return actions
except Exception as e:
# Fallback: continue with current plan or create simple recovery plan
if current_plan:
return current_plan
else:
return [
AgentAction(
action_type="think",
parameters={"thought": "Replanning failed. Need to reassess the situation."},
expected_outcome="Better understanding of current state"
)
]
def _format_tools_description(self, tools: List[Tool]) -> str:
"""Format tools description for LLM"""
descriptions = []
for tool in tools:
desc = f"- {tool.name}: {tool.description}\n"
desc += f" Parameters: {json.dumps(tool.parameters_schema, indent=4)}"
descriptions.append(desc)
return "\n\n".join(descriptions)
def _parse_plan(self, plan_text: str) -> List[Dict[str, Any]]:
"""Parse plan from LLM response"""
try:
# Try to extract JSON from the response
start_idx = plan_text.find('[')
end_idx = plan_text.rfind(']') + 1
if start_idx >= 0 and end_idx > start_idx:
json_text = plan_text[start_idx:end_idx]
return json.loads(json_text)
else:
# Fallback: try to parse the entire response
return json.loads(plan_text)
except json.JSONDecodeError:
# Fallback: create simple plan from text
lines = plan_text.strip().split('\n')
plan_steps = []
for i, line in enumerate(lines):
if line.strip():
plan_steps.append({
"action_type": "think",
"description": line.strip(),
"parameters": {"thought": line.strip()},
"expected_outcome": f"Step {i+1} completed"
})
return plan_steps if plan_steps else [
{
"action_type": "think",
"description": "Plan parsing failed",
"parameters": {"thought": "Need to approach this problem step by step"},
"expected_outcome": "Better understanding"
}
]
class HierarchicalPlanner(Planner):
"""Hierarchical planner that breaks down complex goals into sub-goals"""
def __init__(self, llm_planner: LLMPlanner, max_depth: int = 3):
self.llm_planner = llm_planner
self.max_depth = max_depth
async def create_plan(self,
goal: str,
context: Dict[str, Any],
available_tools: List[Tool]) -> List[AgentAction]:
"""Create hierarchical plan"""
# Start with high-level decomposition
high_level_plan = await self._decompose_goal(goal, context, available_tools)
# Expand each high-level step into detailed actions
detailed_actions = []
for high_level_action in high_level_plan:
if high_level_action.action_type == "sub_goal":
# Recursively plan for sub-goal
sub_goal = high_level_action.parameters.get("sub_goal", "")
sub_actions = await self.llm_planner.create_plan(
sub_goal, context, available_tools
)
detailed_actions.extend(sub_actions)
else:
detailed_actions.append(high_level_action)
return detailed_actions
async def _decompose_goal(self,
goal: str,
context: Dict[str, Any],
available_tools: List[Tool]) -> List[AgentAction]:
"""Decompose goal into sub-goals"""
decomposition_prompt = f"""
Break down this complex goal into 3-5 high-level sub-goals:
Goal: {goal}
Context: {json.dumps(context, default=str)}
Return a JSON array where each sub-goal is an object with:
- action_type: "sub_goal"
- description: Brief description of the sub-goal
- parameters: {{"sub_goal": "detailed sub-goal description"}}
- expected_outcome: What completing this sub-goal achieves
Sub-goals should be:
1. Logically ordered
2. Manageable in scope
3. Clear and specific
4. Necessary for the overall goal
Sub-goals:
"""
try:
response = await self.llm_planner.client.chat.completions.create(
model=self.llm_planner.model,
messages=[{"role": "user", "content": decomposition_prompt}],
temperature=self.llm_planner.temperature,
max_tokens=1000
)
decomposition_text = response.choices[0].message.content
sub_goals_data = self.llm_planner._parse_plan(decomposition_text)
# Convert to AgentAction objects
actions = []
for sub_goal_data in sub_goals_data:
action = AgentAction(
action_type="sub_goal",
parameters=sub_goal_data.get("parameters", {}),
expected_outcome=sub_goal_data.get("expected_outcome")
)
actions.append(action)
return actions
except Exception as e:
# Fallback: treat as single goal
return [
AgentAction(
action_type="sub_goal",
parameters={"sub_goal": goal},
expected_outcome="Goal completion"
)
]
async def replan(self,
current_plan: List[AgentAction],
execution_results: List[Dict[str, Any]],
new_context: Dict[str, Any]) -> List[AgentAction]:
"""Delegate replanning to LLM planner"""
return await self.llm_planner.replan(current_plan, execution_results, new_context)
Complete AI Agent Implementation
# complete_agent.py
import asyncio
from typing import Dict, List, Any, Optional
import logging
from datetime import datetime
class IntelligentAgent(BaseAgent):
"""Complete intelligent agent with advanced capabilities"""
def __init__(self,
agent_id: str,
name: str,
embedding_model,
llm_client,
allowed_file_paths: List[str] = None,
web_search_api_key: str = None,
**kwargs):
# Initialize memory system
memory = HybridMemory(
embedding_model=embedding_model,
vector_memory_config={"db_path": f"agent_{agent_id}_memory.db"},
episodic_memory_config={"max_episodes": 1000},
working_memory_config={"capacity": 7}
)
# Initialize planner
planner = LLMPlanner(api_key=llm_client.api_key)
# Initialize tools
tools = []
if web_search_api_key:
tools.append(WebSearchTool(web_search_api_key))
tools.append(CodeExecutionTool())
if allowed_file_paths:
tools.append(FileOperationTool(allowed_file_paths))
tools.append(APICallTool(allowed_domains=["api.github.com", "httpbin.org"]))
super().__init__(
agent_id=agent_id,
name=name,
memory=memory,
planner=planner,
tools=tools,
**kwargs
)
self.llm_client = llm_client
self.logger = logging.getLogger(f"agent.{agent_id}")
# Add callbacks
self.add_state_change_callback(self._log_state_change)
self.add_task_completion_callback(self._log_task_completion)
async def _log_state_change(self, agent, old_state, new_state):
"""Log state changes"""
self.logger.info(f"Agent {agent.name} state changed: {old_state.value} -> {new_state.value}")
async def _log_task_completion(self, agent, task, result):
"""Log task completion"""
self.logger.info(f"Agent {agent.name} completed task {task.id}: {task.description}")
# Store task result in memory
await agent.memory.store(
f"task_result_{task.id}",
{
"task": task,
"result": result,
"completion_time": datetime.now()
}
)
async def chat(self, message: str) -> str:
"""Chat interface for the agent"""
# Create a simple task for the chat message
task = Task(
id=f"chat_{datetime.now().isoformat()}",
description=f"Respond to: {message}",
priority=1
)
# Execute the task
result = await self.execute_task(task)
if result["success"]:
return str(result["result"])
else:
return f"I encountered an error: {result.get('error', 'Unknown error')}"
async def solve_problem(self, problem_description: str) -> Dict[str, Any]:
"""Solve a complex problem autonomously"""
# Create task
task = Task(
id=f"problem_{datetime.now().isoformat()}",
description=problem_description,
priority=1
)
# Store problem context in memory
await self.memory.store(
f"problem_context_{task.id}",
{
"problem": problem_description,
"start_time": datetime.now(),
"agent_id": self.agent_id
}
)
# Execute task
result = await self.execute_task(task)
return result
async def learn_from_feedback(self, task_id: str, feedback: str, rating: int):
"""Learn from user feedback"""
# Store feedback in memory
await self.memory.store(
f"feedback_{task_id}",
{
"task_id": task_id,
"feedback": feedback,
"rating": rating,
"timestamp": datetime.now()
}
)
# If rating is low, analyze what went wrong
if rating < 3:
await self._analyze_failure(task_id, feedback)
async def _analyze_failure(self, task_id: str, feedback: str):
"""Analyze failure and store learnings"""
# Retrieve task execution history
task_history = []
for execution in self.execution_history:
if execution.get("task_id") == task_id:
task_history.append(execution)
# Use LLM to analyze what went wrong
analysis_prompt = f"""
Analyze this task failure and provide insights:
Task ID: {task_id}
User Feedback: {feedback}
Execution History: {json.dumps(task_history, default=str)}
What went wrong and how can I improve next time?
Provide specific, actionable insights.
"""
try:
response = await self.llm_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": analysis_prompt}],
temperature=0.1
)
analysis = response.choices[0].message.content
# Store analysis as learning
await self.memory.store(
f"learning_{task_id}",
{
"task_id": task_id,
"failure_analysis": analysis,
"timestamp": datetime.now(),
"category": "failure_analysis"
}
)
except Exception as e:
self.logger.error(f"Failed to analyze failure: {e}")
async def get_capabilities(self) -> Dict[str, Any]:
"""Get agent capabilities and status"""
return {
"agent_id": self.agent_id,
"name": self.name,
"state": self.state.value,
"tools": list(self.tools.keys()),
"memory_systems": ["vector", "episodic", "working"],
"planning_capability": "hierarchical_llm",
"learning_capability": True,
"current_task": self.current_task.description if self.current_task else None,
"execution_history_length": len(self.execution_history)
}
# Example usage and testing
async def main():
# Initialize dependencies (mock for example)
class MockEmbeddingModel:
async def embed_text(self, text: str):
import numpy as np
# Mock embedding
return np.random.rand(384).astype(np.float32)
class MockLLMClient:
api_key = "mock_key"
class ChatCompletions:
async def create(self, **kwargs):
class MockResponse:
class Choice:
class Message:
content = '{"action_type": "think", "description": "I need to analyze this problem", "parameters": {"thought": "This is a complex task requiring careful consideration"}, "expected_outcome": "Better understanding of the problem"}'
message = Message()
choices = [Choice()]
return MockResponse()
chat = type('Chat', (), {'completions': ChatCompletions()})()
# Create agent
agent = IntelligentAgent(
agent_id="agent_001",
name="Problem Solver",
embedding_model=MockEmbeddingModel(),
llm_client=MockLLMClient(),
allowed_file_paths=["/tmp"],
web_search_api_key="mock_key"
)
# Test capabilities
capabilities = await agent.get_capabilities()
print(f"Agent capabilities: {capabilities}")
# Test problem solving
result = await agent.solve_problem(
"Write a Python script that calculates the fibonacci sequence up to n terms"
)
print(f"Problem solving result: {result}")
# Test chat interface
response = await agent.chat("Hello, can you help me with programming?")
print(f"Chat response: {response}")
# Test learning
await agent.learn_from_feedback("task_123", "Good solution but could be more efficient", 4)
if __name__ == "__main__":
asyncio.run(main())
Multi-Agent Systems and Coordination
Agent Communication and Coordination
# multi_agent_system.py
import asyncio
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass
from enum import Enum
import json
from datetime import datetime
import uuid
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
BROADCAST = "broadcast"
COORDINATION = "coordination"
STATUS_UPDATE = "status_update"
@dataclass
class Message:
id: str
sender_id: str
receiver_id: Optional[str] # None for broadcast
message_type: MessageType
content: Dict[str, Any]
timestamp: datetime = datetime.now()
conversation_id: Optional[str] = None
class MessageBus:
"""Central message bus for agent communication"""
def __init__(self):
self.subscribers: Dict[str, List[asyncio.Queue]] = {}
self.message_history: List[Message] = []
self.active_conversations: Dict[str, List[str]] = {}
async def publish(self, message: Message):
"""Publish message to relevant subscribers"""
self.message_history.append(message)
# Send to specific receiver
if message.receiver_id and message.receiver_id in self.subscribers:
for queue in self.subscribers[message.receiver_id]:
await queue.put(message)
# Send to all subscribers for broadcasts
elif message.message_type == MessageType.BROADCAST:
for agent_id, queues in self.subscribers.items():
if agent_id != message.sender_id: # Don't send to sender
for queue in queues:
await queue.put(message)
def subscribe(self, agent_id: str) -> asyncio.Queue:
"""Subscribe agent to message bus"""
if agent_id not in self.subscribers:
self.subscribers[agent_id] = []
queue = asyncio.Queue()
self.subscribers[agent_id].append(queue)
return queue
def unsubscribe(self, agent_id: str, queue: asyncio.Queue):
"""Unsubscribe agent queue"""
if agent_id in self.subscribers:
if queue in self.subscribers[agent_id]:
self.subscribers[agent_id].remove(queue)
def get_conversation_history(self, conversation_id: str) -> List[Message]:
"""Get messages for a conversation"""
return [
msg for msg in self.message_history
if msg.conversation_id == conversation_id
]
class MultiAgentCoordinator:
"""Coordinates multiple agents working together"""
def __init__(self, message_bus: MessageBus):
self.message_bus = message_bus
self.agents: Dict[str, IntelligentAgent] = {}
self.active_collaborations: Dict[str, Dict[str, Any]] = {}
def register_agent(self, agent: IntelligentAgent):
"""Register agent with coordinator"""
self.agents[agent.agent_id] = agent
# Subscribe agent to message bus
queue = self.message_bus.subscribe(agent.agent_id)
# Start message handling for agent
asyncio.create_task(self._handle_agent_messages(agent, queue))
async def _handle_agent_messages(self, agent: IntelligentAgent, queue: asyncio.Queue):
"""Handle incoming messages for an agent"""
while True:
try:
message = await queue.get()
if message.message_type == MessageType.REQUEST:
await self._handle_request(agent, message)
elif message.message_type == MessageType.COORDINATION:
await self._handle_coordination(agent, message)
elif message.message_type == MessageType.STATUS_UPDATE:
await self._handle_status_update(agent, message)
except Exception as e:
agent.logger.error(f"Error handling message: {e}")
async def _handle_request(self, agent: IntelligentAgent, message: Message):
"""Handle request message"""
request_content = message.content
if request_content.get("type") == "task_help":
# Another agent is requesting help with a task
task_description = request_content.get("task_description")
# Check if this agent can help
can_help = await self._can_agent_help(agent, task_description)
response = Message(
id=str(uuid.uuid4()),
sender_id=agent.agent_id,
receiver_id=message.sender_id,
message_type=MessageType.RESPONSE,
content={
"type": "task_help_response",
"can_help": can_help,
"capabilities": await agent.get_capabilities(),
"original_request_id": message.id
},
conversation_id=message.conversation_id
)
await self.message_bus.publish(response)
async def _handle_coordination(self, agent: IntelligentAgent, message: Message):
"""Handle coordination message"""
coordination_content = message.content
if coordination_content.get("type") == "collaboration_invite":
# Invitation to collaborate on a task
collaboration_id = coordination_content.get("collaboration_id")
task_description = coordination_content.get("task_description")
# Decide whether to join collaboration
should_join = await self._should_join_collaboration(
agent, task_description, collaboration_id
)
if should_join:
# Join collaboration
self.active_collaborations[collaboration_id] = {
"task_description": task_description,
"participants": coordination_content.get("participants", []) + [agent.agent_id],
"coordinator": coordination_content.get("coordinator"),
"status": "active"
}
# Send confirmation
response = Message(
id=str(uuid.uuid4()),
sender_id=agent.agent_id,
receiver_id=message.sender_id,
message_type=MessageType.RESPONSE,
content={
"type": "collaboration_join",
"collaboration_id": collaboration_id,
"agent_capabilities": await agent.get_capabilities()
}
)
await self.message_bus.publish(response)
async def _handle_status_update(self, agent: IntelligentAgent, message: Message):
"""Handle status update message"""
status_content = message.content
# Log status update
agent.logger.info(f"Received status update from {message.sender_id}: {status_content}")
# Store in agent memory
await agent.memory.store(
f"status_update_{message.sender_id}_{message.timestamp.isoformat()}",
status_content
)
async def _can_agent_help(self, agent: IntelligentAgent, task_description: str) -> bool:
"""Determine if agent can help with a task"""
# Simple capability matching
capabilities = await agent.get_capabilities()
task_lower = task_description.lower()
# Check tool capabilities
if "code" in task_lower and "code_execution" in capabilities["tools"]:
return True
if "search" in task_lower and "web_search" in capabilities["tools"]:
return True
if "file" in task_lower and "file_operations" in capabilities["tools"]:
return True
# Always willing to help with thinking/planning
return True
async def _should_join_collaboration(self,
agent: IntelligentAgent,
task_description: str,
collaboration_id: str) -> bool:
"""Determine if agent should join collaboration"""
# Check if agent is already in too many collaborations
agent_collaborations = [
collab for collab in self.active_collaborations.values()
if agent.agent_id in collab.get("participants", [])
]
if len(agent_collaborations) >= 3: # Max 3 simultaneous collaborations
return False
# Check if agent can contribute
can_help = await self._can_agent_help(agent, task_description)
return can_help
async def create_collaboration(self,
task_description: str,
coordinator_agent_id: str,
required_capabilities: List[str] = None) -> str:
"""Create a new collaboration"""
collaboration_id = str(uuid.uuid4())
# Find suitable agents
suitable_agents = []
for agent_id, agent in self.agents.items():
if agent_id == coordinator_agent_id:
continue
capabilities = await agent.get_capabilities()
# Check if agent has required capabilities
if required_capabilities:
has_required = any(
capability in capabilities["tools"]
for capability in required_capabilities
)
if has_required:
suitable_agents.append(agent_id)
else:
# Check general helpfulness
if await self._can_agent_help(agent, task_description):
suitable_agents.append(agent_id)
# Send collaboration invites
for agent_id in suitable_agents[:5]: # Limit to 5 agents
invite_message = Message(
id=str(uuid.uuid4()),
sender_id=coordinator_agent_id,
receiver_id=agent_id,
message_type=MessageType.COORDINATION,
content={
"type": "collaboration_invite",
"collaboration_id": collaboration_id,
"task_description": task_description,
"coordinator": coordinator_agent_id,
"participants": [coordinator_agent_id]
},
conversation_id=collaboration_id
)
await self.message_bus.publish(invite_message)
return collaboration_id
async def broadcast_status(self, agent_id: str, status: Dict[str, Any]):
"""Broadcast agent status to all other agents"""
message = Message(
id=str(uuid.uuid4()),
sender_id=agent_id,
receiver_id=None, # Broadcast
message_type=MessageType.BROADCAST,
content={
"type": "status_broadcast",
"agent_id": agent_id,
"status": status,
"timestamp": datetime.now().isoformat()
}
)
await self.message_bus.publish(message)
def get_system_status(self) -> Dict[str, Any]:
"""Get overall system status"""
return {
"total_agents": len(self.agents),
"active_collaborations": len(self.active_collaborations),
"message_history_length": len(self.message_bus.message_history),
"agents": {
agent_id: {
"name": agent.name,
"state": agent.state.value,
"current_task": agent.current_task.description if agent.current_task else None
}
for agent_id, agent in self.agents.items()
}
}
# Example usage
async def multi_agent_example():
"""Example of multi-agent system in action"""
# Initialize system
message_bus = MessageBus()
coordinator = MultiAgentCoordinator(message_bus)
# Create agents (using mock dependencies)
agent1 = IntelligentAgent("agent_001", "Researcher", MockEmbeddingModel(), MockLLMClient())
agent2 = IntelligentAgent("agent_002", "Coder", MockEmbeddingModel(), MockLLMClient())
agent3 = IntelligentAgent("agent_003", "Analyst", MockEmbeddingModel(), MockLLMClient())
# Register agents
coordinator.register_agent(agent1)
coordinator.register_agent(agent2)
coordinator.register_agent(agent3)
# Create collaboration
collaboration_id = await coordinator.create_collaboration(
"Research and implement a machine learning solution for text classification",
"agent_001",
["web_search", "code_execution"]
)
print(f"Created collaboration: {collaboration_id}")
# Broadcast status update
await coordinator.broadcast_status("agent_001", {
"current_activity": "researching text classification algorithms",
"progress": 0.3
})
# Get system status
status = coordinator.get_system_status()
print(f"System status: {status}")
if __name__ == "__main__":
asyncio.run(multi_agent_example())
Best Practices and Production Deployment
Error Handling and Recovery
- Graceful Degradation: Implement fallback strategies when tools fail
- Circuit Breakers: Prevent cascade failures in multi-agent systems
- Retry Logic: Implement exponential backoff for transient failures
- State Recovery: Persist agent state for recovery from crashes
Performance Optimization
- Async Operations: Use async/await for concurrent operations
- Memory Management: Implement memory cleanup and size limits
- Tool Caching: Cache tool results when appropriate
- Plan Optimization: Optimize plans based on execution history
Security Considerations
- Tool Sandboxing: Run tools in isolated environments
- Input Validation: Validate all tool parameters
- Access Control: Implement proper authorization for tool usage
- Audit Logging: Log all agent actions for security review
Conclusion
AI agents represent a paradigm shift in how we build intelligent systems, moving from reactive applications to proactive, autonomous systems capable of complex reasoning and task execution. The patterns and implementations covered in this guide provide a foundation for building production-ready agent systems.
Key success factors:
- Modular Architecture: Design for extensibility and maintainability
- Robust Planning: Implement sophisticated planning and replanning capabilities
- Effective Memory: Use appropriate memory systems for different use cases
- Tool Integration: Provide agents with powerful, safe tools
- Multi-Agent Coordination: Enable agents to work together effectively
As AI agent technology continues to evolve, staying current with new techniques, tools, and best practices will be essential for building systems that can truly augment human capabilities and automate complex workflows.