Build AI Agents That Save 10 Hours Weekly

David Childs

Master building AI agents capable of autonomous task execution, decision-making, and workflow automation with practical implementations and patterns.

AI agents represent the next frontier in artificial intelligence applications, moving beyond simple question-answering to autonomous task execution and decision-making. These systems can break down complex objectives into manageable steps, use tools dynamically, maintain context across interactions, and coordinate with other agents to achieve sophisticated goals. This comprehensive guide will teach you to build production-ready AI agent systems.

Understanding AI Agent Architecture

AI agents are autonomous systems that can perceive their environment, make decisions, and take actions to achieve specific goals. Unlike traditional rule-based automation, AI agents can adapt to new situations, handle uncertainty, and learn from experience.

Core Agent Components

# agent_architecture.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable, Union
from enum import Enum
import asyncio
import json
from datetime import datetime
import uuid

class AgentState(Enum):
    IDLE = "idle"
    PLANNING = "planning"
    EXECUTING = "executing"
    WAITING = "waiting"
    ERROR = "error"
    COMPLETED = "completed"

@dataclass
class Task:
    id: str
    description: str
    priority: int = 1
    dependencies: List[str] = field(default_factory=list)
    parameters: Dict[str, Any] = field(default_factory=dict)
    status: str = "pending"
    assigned_agent: Optional[str] = None
    created_at: datetime = field(default_factory=datetime.now)
    completed_at: Optional[datetime] = None
    result: Optional[Any] = None

@dataclass
class AgentAction:
    action_type: str
    parameters: Dict[str, Any]
    tool_name: Optional[str] = None
    expected_outcome: Optional[str] = None

@dataclass
class AgentObservation:
    observation_type: str
    content: Any
    timestamp: datetime = field(default_factory=datetime.now)
    source: Optional[str] = None

class Memory(ABC):
    """Abstract memory interface for agents"""
    
    @abstractmethod
    async def store(self, key: str, value: Any, ttl: Optional[int] = None):
        """Store information in memory"""
        pass
    
    @abstractmethod
    async def retrieve(self, key: str) -> Optional[Any]:
        """Retrieve information from memory"""
        pass
    
    @abstractmethod
    async def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
        """Search memory for relevant information"""
        pass

class Tool(ABC):
    """Abstract tool interface"""
    
    @property
    @abstractmethod
    def name(self) -> str:
        pass
    
    @property
    @abstractmethod
    def description(self) -> str:
        pass
    
    @property
    @abstractmethod
    def parameters_schema(self) -> Dict[str, Any]:
        pass
    
    @abstractmethod
    async def execute(self, **kwargs) -> Dict[str, Any]:
        """Execute the tool with given parameters"""
        pass

class Planner(ABC):
    """Abstract planner interface"""
    
    @abstractmethod
    async def create_plan(self, 
                         goal: str, 
                         context: Dict[str, Any],
                         available_tools: List[Tool]) -> List[AgentAction]:
        """Create a plan to achieve the given goal"""
        pass
    
    @abstractmethod
    async def replan(self, 
                    current_plan: List[AgentAction],
                    execution_results: List[Dict[str, Any]],
                    new_context: Dict[str, Any]) -> List[AgentAction]:
        """Replan based on execution results"""
        pass

class BaseAgent(ABC):
    """Base agent class with core functionality"""
    
    def __init__(self,
                 agent_id: str,
                 name: str,
                 memory: Memory,
                 planner: Planner,
                 tools: List[Tool] = None,
                 max_iterations: int = 10):
        
        self.agent_id = agent_id
        self.name = name
        self.memory = memory
        self.planner = planner
        self.tools = {tool.name: tool for tool in (tools or [])}
        self.max_iterations = max_iterations
        
        # Agent state
        self.state = AgentState.IDLE
        self.current_task: Optional[Task] = None
        self.current_plan: List[AgentAction] = []
        self.execution_history: List[Dict[str, Any]] = []
        
        # Callbacks
        self.state_change_callbacks: List[Callable] = []
        self.task_completion_callbacks: List[Callable] = []
    
    async def execute_task(self, task: Task) -> Dict[str, Any]:
        """Execute a task autonomously"""
        
        self.current_task = task
        self._change_state(AgentState.PLANNING)
        
        try:
            # Create initial plan
            context = await self._gather_context(task)
            self.current_plan = await self.planner.create_plan(
                task.description, context, list(self.tools.values())
            )
            
            # Execute plan with iterations
            result = await self._execute_plan_with_feedback()
            
            # Mark task as completed
            task.status = "completed"
            task.completed_at = datetime.now()
            task.result = result
            
            self._change_state(AgentState.COMPLETED)
            
            # Notify callbacks
            for callback in self.task_completion_callbacks:
                await callback(self, task, result)
            
            return {
                "success": True,
                "result": result,
                "iterations": len(self.execution_history),
                "execution_history": self.execution_history
            }
            
        except Exception as e:
            self._change_state(AgentState.ERROR)
            task.status = "error"
            
            return {
                "success": False,
                "error": str(e),
                "execution_history": self.execution_history
            }
        
        finally:
            self.current_task = None
            self.current_plan = []
    
    async def _execute_plan_with_feedback(self) -> Any:
        """Execute plan with feedback and replanning"""
        
        self._change_state(AgentState.EXECUTING)
        
        for iteration in range(self.max_iterations):
            if not self.current_plan:
                break
            
            # Execute next action
            action = self.current_plan.pop(0)
            execution_result = await self._execute_action(action)
            
            # Record execution
            self.execution_history.append({
                "iteration": iteration,
                "action": action,
                "result": execution_result,
                "timestamp": datetime.now()
            })
            
            # Check if task is complete
            if await self._is_task_complete(execution_result):
                return execution_result.get("result")
            
            # Check if replanning is needed
            if await self._should_replan(execution_result):
                context = await self._gather_context(self.current_task)
                context["execution_history"] = self.execution_history
                
                self.current_plan = await self.planner.replan(
                    self.current_plan, 
                    self.execution_history,
                    context
                )
        
        # If we've exhausted iterations, return the last result
        if self.execution_history:
            return self.execution_history[-1]["result"].get("result")
        
        raise RuntimeError("Failed to complete task within maximum iterations")
    
    async def _execute_action(self, action: AgentAction) -> Dict[str, Any]:
        """Execute a single action"""
        
        try:
            if action.tool_name and action.tool_name in self.tools:
                # Execute tool
                tool = self.tools[action.tool_name]
                result = await tool.execute(**action.parameters)
                
                return {
                    "success": True,
                    "action_type": action.action_type,
                    "tool_used": action.tool_name,
                    "result": result
                }
            else:
                # Handle non-tool actions (thinking, planning, etc.)
                result = await self._handle_internal_action(action)
                
                return {
                    "success": True,
                    "action_type": action.action_type,
                    "result": result
                }
                
        except Exception as e:
            return {
                "success": False,
                "action_type": action.action_type,
                "error": str(e)
            }
    
    async def _handle_internal_action(self, action: AgentAction) -> Any:
        """Handle internal agent actions (thinking, memory operations, etc.)"""
        
        if action.action_type == "think":
            # Store thought in memory
            thought = action.parameters.get("thought", "")
            await self.memory.store(f"thought_{datetime.now().isoformat()}", thought)
            return {"thought": thought}
        
        elif action.action_type == "remember":
            # Retrieve from memory
            query = action.parameters.get("query", "")
            memories = await self.memory.search(query)
            return {"memories": memories}
        
        elif action.action_type == "wait":
            # Wait for specified duration
            duration = action.parameters.get("duration", 1)
            await asyncio.sleep(duration)
            return {"waited": duration}
        
        else:
            return {"message": f"Unknown internal action: {action.action_type}"}
    
    async def _gather_context(self, task: Task) -> Dict[str, Any]:
        """Gather context for planning"""
        
        context = {
            "task": task,
            "available_tools": list(self.tools.keys()),
            "agent_capabilities": await self._get_capabilities(),
            "relevant_memories": await self.memory.search(task.description, limit=5)
        }
        
        return context
    
    async def _get_capabilities(self) -> List[str]:
        """Get agent capabilities"""
        
        capabilities = ["planning", "execution", "memory", "learning"]
        capabilities.extend(list(self.tools.keys()))
        
        return capabilities
    
    async def _is_task_complete(self, execution_result: Dict[str, Any]) -> bool:
        """Check if the task is complete"""
        
        # Simple completion check based on result
        if execution_result.get("success") and "final_result" in execution_result.get("result", {}):
            return True
        
        # Check if task-specific completion criteria are met
        if self.current_task and hasattr(self, '_check_task_completion'):
            return await self._check_task_completion(self.current_task, execution_result)
        
        return False
    
    async def _should_replan(self, execution_result: Dict[str, Any]) -> bool:
        """Determine if replanning is needed"""
        
        # Replan if action failed
        if not execution_result.get("success"):
            return True
        
        # Replan if unexpected result
        if "unexpected" in execution_result.get("result", {}):
            return True
        
        # Replan if explicitly requested
        if execution_result.get("replan_requested"):
            return True
        
        return False
    
    def _change_state(self, new_state: AgentState):
        """Change agent state and notify callbacks"""
        
        old_state = self.state
        self.state = new_state
        
        for callback in self.state_change_callbacks:
            asyncio.create_task(callback(self, old_state, new_state))
    
    def add_state_change_callback(self, callback: Callable):
        """Add callback for state changes"""
        self.state_change_callbacks.append(callback)
    
    def add_task_completion_callback(self, callback: Callable):
        """Add callback for task completion"""
        self.task_completion_callbacks.append(callback)
    
    def add_tool(self, tool: Tool):
        """Add a tool to the agent"""
        self.tools[tool.name] = tool
    
    def remove_tool(self, tool_name: str):
        """Remove a tool from the agent"""
        if tool_name in self.tools:
            del self.tools[tool_name]

Memory Systems for Agents

# agent_memory.py
import asyncio
import json
import sqlite3
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import numpy as np
from collections import defaultdict, deque
import hashlib

class VectorMemory(Memory):
    """Vector-based semantic memory for agents"""
    
    def __init__(self, 
                 embedding_model,
                 db_path: str = "agent_memory.db",
                 max_memories: int = 10000):
        
        self.embedding_model = embedding_model
        self.db_path = db_path
        self.max_memories = max_memories
        
        # Initialize database
        self._init_database()
        
        # In-memory cache for recent memories
        self.memory_cache = {}
        self.cache_size = 1000
    
    def _init_database(self):
        """Initialize SQLite database for memory storage"""
        
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS memories (
                    id TEXT PRIMARY KEY,
                    key TEXT,
                    content TEXT,
                    embedding BLOB,
                    metadata TEXT,
                    created_at TIMESTAMP,
                    accessed_at TIMESTAMP,
                    access_count INTEGER DEFAULT 1,
                    ttl TIMESTAMP
                )
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_key ON memories(key)
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_created_at ON memories(created_at)
            """)
    
    async def store(self, key: str, value: Any, ttl: Optional[int] = None):
        """Store information with semantic embedding"""
        
        # Create memory ID
        memory_id = hashlib.md5(f"{key}_{datetime.now().isoformat()}".encode()).hexdigest()
        
        # Convert value to string for embedding
        if isinstance(value, dict):
            content = json.dumps(value)
        else:
            content = str(value)
        
        # Generate embedding
        embedding = await self.embedding_model.embed_text(content)
        
        # Calculate TTL
        ttl_timestamp = None
        if ttl:
            ttl_timestamp = datetime.now() + timedelta(seconds=ttl)
        
        # Store in database
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT OR REPLACE INTO memories 
                (id, key, content, embedding, metadata, created_at, accessed_at, ttl)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                memory_id,
                key,
                content,
                embedding.tobytes(),
                json.dumps({"type": type(value).__name__}),
                datetime.now(),
                datetime.now(),
                ttl_timestamp
            ))
        
        # Update cache
        self.memory_cache[key] = {
            "content": value,
            "embedding": embedding,
            "created_at": datetime.now()
        }
        
        # Cleanup old memories if needed
        await self._cleanup_memories()
    
    async def retrieve(self, key: str) -> Optional[Any]:
        """Retrieve information by exact key"""
        
        # Check cache first
        if key in self.memory_cache:
            cached = self.memory_cache[key]
            return cached["content"]
        
        # Query database
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("""
                SELECT content, metadata, access_count
                FROM memories 
                WHERE key = ? AND (ttl IS NULL OR ttl > ?)
                ORDER BY created_at DESC
                LIMIT 1
            """, (key, datetime.now()))
            
            row = cursor.fetchone()
            if row:
                content, metadata, access_count = row
                
                # Update access count
                conn.execute("""
                    UPDATE memories 
                    SET accessed_at = ?, access_count = access_count + 1
                    WHERE key = ?
                """, (datetime.now(), key))
                
                # Try to deserialize based on metadata
                try:
                    meta = json.loads(metadata)
                    if meta.get("type") == "dict":
                        return json.loads(content)
                    else:
                        return content
                except:
                    return content
        
        return None
    
    async def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
        """Search for semantically similar memories"""
        
        # Generate query embedding
        query_embedding = await self.embedding_model.embed_text(query)
        
        # Get all memories from database
        memories = []
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("""
                SELECT id, key, content, embedding, created_at, access_count
                FROM memories 
                WHERE ttl IS NULL OR ttl > ?
            """, (datetime.now(),))
            
            for row in cursor.fetchall():
                memory_id, key, content, embedding_bytes, created_at, access_count = row
                
                # Convert embedding back to numpy array
                memory_embedding = np.frombuffer(embedding_bytes, dtype=np.float32)
                
                # Calculate similarity
                similarity = self._cosine_similarity(query_embedding, memory_embedding)
                
                memories.append({
                    "id": memory_id,
                    "key": key,
                    "content": content,
                    "similarity": similarity,
                    "created_at": created_at,
                    "access_count": access_count
                })
        
        # Sort by similarity and return top results
        memories.sort(key=lambda x: x["similarity"], reverse=True)
        
        return memories[:limit]
    
    def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        """Calculate cosine similarity between two vectors"""
        
        dot_product = np.dot(vec1, vec2)
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        
        if norm1 == 0 or norm2 == 0:
            return 0.0
        
        return dot_product / (norm1 * norm2)
    
    async def _cleanup_memories(self):
        """Cleanup old memories to maintain size limits"""
        
        with sqlite3.connect(self.db_path) as conn:
            # Remove expired memories
            conn.execute("DELETE FROM memories WHERE ttl IS NOT NULL AND ttl <= ?", (datetime.now(),))
            
            # Check total count
            cursor = conn.execute("SELECT COUNT(*) FROM memories")
            total_count = cursor.fetchone()[0]
            
            if total_count > self.max_memories:
                # Remove oldest, least accessed memories
                excess = total_count - self.max_memories
                conn.execute("""
                    DELETE FROM memories 
                    WHERE id IN (
                        SELECT id FROM memories 
                        ORDER BY access_count ASC, accessed_at ASC 
                        LIMIT ?
                    )
                """, (excess,))

class EpisodicMemory(Memory):
    """Episodic memory for storing agent experiences"""
    
    def __init__(self, max_episodes: int = 1000):
        self.episodes = deque(maxlen=max_episodes)
        self.episode_index = {}  # For fast lookups
        
    async def store(self, key: str, value: Any, ttl: Optional[int] = None):
        """Store an episode"""
        
        episode = {
            "id": key,
            "content": value,
            "timestamp": datetime.now(),
            "ttl": datetime.now() + timedelta(seconds=ttl) if ttl else None
        }
        
        self.episodes.append(episode)
        self.episode_index[key] = episode
    
    async def retrieve(self, key: str) -> Optional[Any]:
        """Retrieve episode by ID"""
        
        episode = self.episode_index.get(key)
        if episode and (not episode["ttl"] or episode["ttl"] > datetime.now()):
            return episode["content"]
        
        return None
    
    async def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
        """Search episodes by content similarity"""
        
        results = []
        query_lower = query.lower()
        
        for episode in self.episodes:
            if episode["ttl"] and episode["ttl"] <= datetime.now():
                continue
            
            content_str = str(episode["content"]).lower()
            if query_lower in content_str:
                results.append({
                    "id": episode["id"],
                    "content": episode["content"],
                    "timestamp": episode["timestamp"],
                    "relevance": content_str.count(query_lower) / len(content_str.split())
                })
        
        # Sort by relevance and timestamp
        results.sort(key=lambda x: (x["relevance"], x["timestamp"]), reverse=True)
        
        return results[:limit]
    
    def get_recent_episodes(self, hours: int = 24) -> List[Dict[str, Any]]:
        """Get recent episodes within specified hours"""
        
        cutoff = datetime.now() - timedelta(hours=hours)
        recent = []
        
        for episode in self.episodes:
            if episode["timestamp"] >= cutoff:
                if not episode["ttl"] or episode["ttl"] > datetime.now():
                    recent.append(episode)
        
        return recent

class WorkingMemory(Memory):
    """Working memory for current task context"""
    
    def __init__(self, capacity: int = 7):  # Miller's magic number
        self.capacity = capacity
        self.items = {}
        self.access_order = deque()
        
    async def store(self, key: str, value: Any, ttl: Optional[int] = None):
        """Store item in working memory"""
        
        # Remove if already exists
        if key in self.items:
            self.access_order.remove(key)
        
        # Add new item
        self.items[key] = {
            "value": value,
            "timestamp": datetime.now(),
            "ttl": datetime.now() + timedelta(seconds=ttl) if ttl else None
        }
        
        self.access_order.append(key)
        
        # Evict oldest if over capacity
        while len(self.items) > self.capacity:
            oldest_key = self.access_order.popleft()
            if oldest_key in self.items:
                del self.items[oldest_key]
    
    async def retrieve(self, key: str) -> Optional[Any]:
        """Retrieve item and update access order"""
        
        if key not in self.items:
            return None
        
        item = self.items[key]
        
        # Check TTL
        if item["ttl"] and item["ttl"] <= datetime.now():
            del self.items[key]
            self.access_order.remove(key)
            return None
        
        # Update access order
        self.access_order.remove(key)
        self.access_order.append(key)
        
        return item["value"]
    
    async def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
        """Search working memory items"""
        
        results = []
        query_lower = query.lower()
        
        for key, item in self.items.items():
            if item["ttl"] and item["ttl"] <= datetime.now():
                continue
            
            content_str = str(item["value"]).lower()
            if query_lower in content_str:
                results.append({
                    "key": key,
                    "content": item["value"],
                    "timestamp": item["timestamp"]
                })
        
        return results[:limit]
    
    def get_all_items(self) -> Dict[str, Any]:
        """Get all items in working memory"""
        
        current_time = datetime.now()
        valid_items = {}
        
        for key, item in self.items.items():
            if not item["ttl"] or item["ttl"] > current_time:
                valid_items[key] = item["value"]
        
        return valid_items

class HybridMemory(Memory):
    """Hybrid memory system combining multiple memory types"""
    
    def __init__(self, 
                 embedding_model,
                 vector_memory_config: Dict[str, Any] = None,
                 episodic_memory_config: Dict[str, Any] = None,
                 working_memory_config: Dict[str, Any] = None):
        
        # Initialize memory components
        self.vector_memory = VectorMemory(
            embedding_model, 
            **(vector_memory_config or {})
        )
        
        self.episodic_memory = EpisodicMemory(
            **(episodic_memory_config or {})
        )
        
        self.working_memory = WorkingMemory(
            **(working_memory_config or {})
        )
    
    async def store(self, key: str, value: Any, ttl: Optional[int] = None):
        """Store in appropriate memory systems"""
        
        # Always store in working memory for immediate access
        await self.working_memory.store(key, value, ttl)
        
        # Store in vector memory for semantic search
        await self.vector_memory.store(key, value, ttl)
        
        # Store episodes in episodic memory
        if isinstance(value, dict) and "episode" in str(value).lower():
            await self.episodic_memory.store(key, value, ttl)
    
    async def retrieve(self, key: str) -> Optional[Any]:
        """Retrieve from memory systems in order of speed"""
        
        # Check working memory first
        result = await self.working_memory.retrieve(key)
        if result is not None:
            return result
        
        # Check vector memory
        result = await self.vector_memory.retrieve(key)
        if result is not None:
            # Promote to working memory
            await self.working_memory.store(key, result)
            return result
        
        # Check episodic memory
        result = await self.episodic_memory.retrieve(key)
        if result is not None:
            # Promote to working memory
            await self.working_memory.store(key, result)
            return result
        
        return None
    
    async def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
        """Search across all memory systems"""
        
        # Search all memory systems
        working_results = await self.working_memory.search(query, limit)
        vector_results = await self.vector_memory.search(query, limit)
        episodic_results = await self.episodic_memory.search(query, limit)
        
        # Combine and deduplicate results
        all_results = []
        seen_keys = set()
        
        # Prioritize working memory results
        for result in working_results:
            key = result.get("key", result.get("id", ""))
            if key not in seen_keys:
                result["source"] = "working"
                all_results.append(result)
                seen_keys.add(key)
        
        # Add vector memory results
        for result in vector_results:
            key = result.get("key", result.get("id", ""))
            if key not in seen_keys:
                result["source"] = "vector"
                all_results.append(result)
                seen_keys.add(key)
        
        # Add episodic memory results
        for result in episodic_results:
            key = result.get("key", result.get("id", ""))
            if key not in seen_keys:
                result["source"] = "episodic"
                all_results.append(result)
                seen_keys.add(key)
        
        return all_results[:limit]

Advanced Tool System

# agent_tools.py
import aiohttp
import asyncio
import subprocess
import json
import os
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
import tempfile
from pathlib import Path

class WebSearchTool(Tool):
    """Tool for web search capabilities"""
    
    def __init__(self, api_key: str, search_engine: str = "serper"):
        self.api_key = api_key
        self.search_engine = search_engine
        
    @property
    def name(self) -> str:
        return "web_search"
    
    @property
    def description(self) -> str:
        return "Search the web for current information on any topic"
    
    @property
    def parameters_schema(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Search query"
                },
                "num_results": {
                    "type": "integer",
                    "description": "Number of results to return",
                    "default": 5
                }
            },
            "required": ["query"]
        }
    
    async def execute(self, **kwargs) -> Dict[str, Any]:
        """Execute web search"""
        
        query = kwargs.get("query")
        num_results = kwargs.get("num_results", 5)
        
        if self.search_engine == "serper":
            return await self._search_with_serper(query, num_results)
        else:
            raise ValueError(f"Unsupported search engine: {self.search_engine}")
    
    async def _search_with_serper(self, query: str, num_results: int) -> Dict[str, Any]:
        """Search using Serper API"""
        
        url = "https://google.serper.dev/search"
        headers = {
            "X-API-KEY": self.api_key,
            "Content-Type": "application/json"
        }
        
        payload = {
            "q": query,
            "num": num_results
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.post(url, headers=headers, json=payload) as response:
                if response.status == 200:
                    data = await response.json()
                    
                    results = []
                    for item in data.get("organic", []):
                        results.append({
                            "title": item.get("title", ""),
                            "snippet": item.get("snippet", ""),
                            "link": item.get("link", ""),
                            "position": item.get("position", 0)
                        })
                    
                    return {
                        "success": True,
                        "results": results,
                        "query": query,
                        "total_results": len(results)
                    }
                else:
                    return {
                        "success": False,
                        "error": f"Search failed with status {response.status}"
                    }

class CodeExecutionTool(Tool):
    """Tool for executing code safely"""
    
    def __init__(self, allowed_languages: List[str] = None, timeout: int = 30):
        self.allowed_languages = allowed_languages or ["python", "javascript", "bash"]
        self.timeout = timeout
    
    @property
    def name(self) -> str:
        return "code_execution"
    
    @property
    def description(self) -> str:
        return "Execute code safely in a controlled environment"
    
    @property
    def parameters_schema(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "language": {
                    "type": "string",
                    "enum": self.allowed_languages,
                    "description": "Programming language"
                },
                "code": {
                    "type": "string",
                    "description": "Code to execute"
                },
                "stdin": {
                    "type": "string",
                    "description": "Standard input for the program",
                    "default": ""
                }
            },
            "required": ["language", "code"]
        }
    
    async def execute(self, **kwargs) -> Dict[str, Any]:
        """Execute code with safety measures"""
        
        language = kwargs.get("language")
        code = kwargs.get("code")
        stdin = kwargs.get("stdin", "")
        
        if language not in self.allowed_languages:
            return {
                "success": False,
                "error": f"Language {language} not allowed"
            }
        
        try:
            if language == "python":
                return await self._execute_python(code, stdin)
            elif language == "javascript":
                return await self._execute_javascript(code, stdin)
            elif language == "bash":
                return await self._execute_bash(code, stdin)
            else:
                return {
                    "success": False,
                    "error": f"Execution not implemented for {language}"
                }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    async def _execute_python(self, code: str, stdin: str) -> Dict[str, Any]:
        """Execute Python code"""
        
        # Create temporary file
        with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
            f.write(code)
            temp_file = f.name
        
        try:
            # Execute with timeout
            process = await asyncio.create_subprocess_exec(
                'python', temp_file,
                stdin=asyncio.subprocess.PIPE,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            
            stdout, stderr = await asyncio.wait_for(
                process.communicate(input=stdin.encode()),
                timeout=self.timeout
            )
            
            return {
                "success": True,
                "stdout": stdout.decode(),
                "stderr": stderr.decode(),
                "exit_code": process.returncode
            }
            
        except asyncio.TimeoutError:
            return {
                "success": False,
                "error": "Code execution timed out"
            }
        
        finally:
            # Cleanup
            os.unlink(temp_file)
    
    async def _execute_javascript(self, code: str, stdin: str) -> Dict[str, Any]:
        """Execute JavaScript code using Node.js"""
        
        with tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False) as f:
            f.write(code)
            temp_file = f.name
        
        try:
            process = await asyncio.create_subprocess_exec(
                'node', temp_file,
                stdin=asyncio.subprocess.PIPE,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            
            stdout, stderr = await asyncio.wait_for(
                process.communicate(input=stdin.encode()),
                timeout=self.timeout
            )
            
            return {
                "success": True,
                "stdout": stdout.decode(),
                "stderr": stderr.decode(),
                "exit_code": process.returncode
            }
            
        except asyncio.TimeoutError:
            return {
                "success": False,
                "error": "Code execution timed out"
            }
        
        finally:
            os.unlink(temp_file)
    
    async def _execute_bash(self, code: str, stdin: str) -> Dict[str, Any]:
        """Execute Bash commands (with restrictions)"""
        
        # Security: restrict dangerous commands
        dangerous_patterns = ['rm -rf', 'sudo', 'chmod', 'chown', '>', '>>', 'dd', 'mkfs']
        
        for pattern in dangerous_patterns:
            if pattern in code:
                return {
                    "success": False,
                    "error": f"Command contains dangerous pattern: {pattern}"
                }
        
        try:
            process = await asyncio.create_subprocess_shell(
                code,
                stdin=asyncio.subprocess.PIPE,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            
            stdout, stderr = await asyncio.wait_for(
                process.communicate(input=stdin.encode()),
                timeout=self.timeout
            )
            
            return {
                "success": True,
                "stdout": stdout.decode(),
                "stderr": stderr.decode(),
                "exit_code": process.returncode
            }
            
        except asyncio.TimeoutError:
            return {
                "success": False,
                "error": "Command execution timed out"
            }

class FileOperationTool(Tool):
    """Tool for safe file operations"""
    
    def __init__(self, allowed_paths: List[str], max_file_size: int = 10 * 1024 * 1024):
        self.allowed_paths = [Path(p).resolve() for p in allowed_paths]
        self.max_file_size = max_file_size
    
    @property
    def name(self) -> str:
        return "file_operations"
    
    @property
    def description(self) -> str:
        return "Perform file operations like read, write, list files"
    
    @property
    def parameters_schema(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "operation": {
                    "type": "string",
                    "enum": ["read", "write", "list", "exists", "delete"],
                    "description": "File operation to perform"
                },
                "path": {
                    "type": "string",
                    "description": "File or directory path"
                },
                "content": {
                    "type": "string",
                    "description": "Content to write (for write operation)"
                },
                "encoding": {
                    "type": "string",
                    "default": "utf-8",
                    "description": "File encoding"
                }
            },
            "required": ["operation", "path"]
        }
    
    async def execute(self, **kwargs) -> Dict[str, Any]:
        """Execute file operation"""
        
        operation = kwargs.get("operation")
        path = Path(kwargs.get("path")).resolve()
        content = kwargs.get("content", "")
        encoding = kwargs.get("encoding", "utf-8")
        
        # Security check: ensure path is within allowed directories
        if not self._is_path_allowed(path):
            return {
                "success": False,
                "error": f"Path {path} not in allowed directories"
            }
        
        try:
            if operation == "read":
                return await self._read_file(path, encoding)
            elif operation == "write":
                return await self._write_file(path, content, encoding)
            elif operation == "list":
                return await self._list_directory(path)
            elif operation == "exists":
                return {
                    "success": True,
                    "exists": path.exists(),
                    "is_file": path.is_file(),
                    "is_directory": path.is_dir()
                }
            elif operation == "delete":
                return await self._delete_file(path)
            else:
                return {
                    "success": False,
                    "error": f"Unknown operation: {operation}"
                }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def _is_path_allowed(self, path: Path) -> bool:
        """Check if path is within allowed directories"""
        
        for allowed_path in self.allowed_paths:
            try:
                path.relative_to(allowed_path)
                return True
            except ValueError:
                continue
        
        return False
    
    async def _read_file(self, path: Path, encoding: str) -> Dict[str, Any]:
        """Read file content"""
        
        if not path.exists():
            return {
                "success": False,
                "error": "File does not exist"
            }
        
        if not path.is_file():
            return {
                "success": False,
                "error": "Path is not a file"
            }
        
        # Check file size
        if path.stat().st_size > self.max_file_size:
            return {
                "success": False,
                "error": f"File too large (max {self.max_file_size} bytes)"
            }
        
        try:
            with open(path, 'r', encoding=encoding) as f:
                content = f.read()
            
            return {
                "success": True,
                "content": content,
                "size": len(content),
                "path": str(path)
            }
        
        except UnicodeDecodeError:
            return {
                "success": False,
                "error": f"Cannot decode file with encoding {encoding}"
            }
    
    async def _write_file(self, path: Path, content: str, encoding: str) -> Dict[str, Any]:
        """Write content to file"""
        
        # Check content size
        if len(content.encode(encoding)) > self.max_file_size:
            return {
                "success": False,
                "error": f"Content too large (max {self.max_file_size} bytes)"
            }
        
        # Create directory if it doesn't exist
        path.parent.mkdir(parents=True, exist_ok=True)
        
        try:
            with open(path, 'w', encoding=encoding) as f:
                f.write(content)
            
            return {
                "success": True,
                "bytes_written": len(content.encode(encoding)),
                "path": str(path)
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    async def _list_directory(self, path: Path) -> Dict[str, Any]:
        """List directory contents"""
        
        if not path.exists():
            return {
                "success": False,
                "error": "Directory does not exist"
            }
        
        if not path.is_dir():
            return {
                "success": False,
                "error": "Path is not a directory"
            }
        
        try:
            items = []
            for item in path.iterdir():
                items.append({
                    "name": item.name,
                    "path": str(item),
                    "is_file": item.is_file(),
                    "is_directory": item.is_dir(),
                    "size": item.stat().st_size if item.is_file() else None
                })
            
            return {
                "success": True,
                "items": items,
                "count": len(items),
                "path": str(path)
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    async def _delete_file(self, path: Path) -> Dict[str, Any]:
        """Delete file or directory"""
        
        if not path.exists():
            return {
                "success": False,
                "error": "Path does not exist"
            }
        
        try:
            if path.is_file():
                path.unlink()
            elif path.is_dir():
                # Only delete if empty
                if any(path.iterdir()):
                    return {
                        "success": False,
                        "error": "Directory not empty"
                    }
                path.rmdir()
            
            return {
                "success": True,
                "deleted": str(path)
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

class APICallTool(Tool):
    """Tool for making HTTP API calls"""
    
    def __init__(self, allowed_domains: List[str] = None, timeout: int = 30):
        self.allowed_domains = allowed_domains or []
        self.timeout = timeout
    
    @property
    def name(self) -> str:
        return "api_call"
    
    @property
    def description(self) -> str:
        return "Make HTTP API calls to external services"
    
    @property
    def parameters_schema(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "method": {
                    "type": "string",
                    "enum": ["GET", "POST", "PUT", "DELETE"],
                    "description": "HTTP method"
                },
                "url": {
                    "type": "string",
                    "description": "API endpoint URL"
                },
                "headers": {
                    "type": "object",
                    "description": "HTTP headers",
                    "default": {}
                },
                "data": {
                    "type": "object",
                    "description": "Request data/payload",
                    "default": {}
                },
                "params": {
                    "type": "object",
                    "description": "URL parameters",
                    "default": {}
                }
            },
            "required": ["method", "url"]
        }
    
    async def execute(self, **kwargs) -> Dict[str, Any]:
        """Execute API call"""
        
        method = kwargs.get("method", "GET")
        url = kwargs.get("url")
        headers = kwargs.get("headers", {})
        data = kwargs.get("data", {})
        params = kwargs.get("params", {})
        
        # Security check: validate domain
        if self.allowed_domains and not self._is_domain_allowed(url):
            return {
                "success": False,
                "error": "Domain not in allowed list"
            }
        
        try:
            async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
                async with session.request(
                    method=method,
                    url=url,
                    headers=headers,
                    json=data if data else None,
                    params=params
                ) as response:
                    
                    response_data = {
                        "success": True,
                        "status_code": response.status,
                        "headers": dict(response.headers),
                        "url": str(response.url)
                    }
                    
                    # Try to parse JSON, fall back to text
                    try:
                        response_data["data"] = await response.json()
                    except:
                        response_data["data"] = await response.text()
                    
                    return response_data
        
        except asyncio.TimeoutError:
            return {
                "success": False,
                "error": "Request timed out"
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def _is_domain_allowed(self, url: str) -> bool:
        """Check if domain is allowed"""
        
        from urllib.parse import urlparse
        
        parsed_url = urlparse(url)
        domain = parsed_url.netloc.lower()
        
        for allowed_domain in self.allowed_domains:
            if domain == allowed_domain.lower() or domain.endswith(f".{allowed_domain.lower()}"):
                return True
        
        return False

LLM-Based Planning System

# llm_planner.py
import json
from typing import List, Dict, Any
from openai import AsyncOpenAI
import asyncio

class LLMPlanner(Planner):
    """LLM-based planner for agent task decomposition"""
    
    def __init__(self, 
                 model: str = "gpt-4",
                 api_key: str = None,
                 temperature: float = 0.1):
        
        self.client = AsyncOpenAI(api_key=api_key)
        self.model = model
        self.temperature = temperature
        
        # Planning prompts
        self.planning_prompt = """
You are an AI planning assistant. Your job is to break down complex goals into executable action steps.

Available tools:
{tools_description}

Current context:
{context}

Goal: {goal}

Create a detailed plan to achieve this goal. Return your plan as a JSON array where each step is an object with:
- action_type: Type of action (use_tool, think, wait, etc.)
- description: What this step accomplishes
- tool_name: Tool to use (if action_type is "use_tool")
- parameters: Parameters for the action
- expected_outcome: What you expect to happen

Guidelines:
1. Break complex tasks into simple, executable steps
2. Use available tools when appropriate
3. Include thinking/reasoning steps when needed
4. Be specific about parameters and expected outcomes
5. Consider error handling and alternative approaches

Plan:
"""
        
        self.replanning_prompt = """
You are an AI replanning assistant. The current plan needs to be adjusted based on execution results.

Original goal: {goal}
Current plan: {current_plan}
Execution history: {execution_history}
New context: {context}

Based on the execution results, create an updated plan. Consider:
1. What went wrong and how to fix it
2. What worked well and should continue
3. Any new information or constraints
4. Alternative approaches if the current one isn't working

Return the updated plan as a JSON array with the same format as before.

Updated plan:
"""
    
    async def create_plan(self, 
                         goal: str, 
                         context: Dict[str, Any],
                         available_tools: List[Tool]) -> List[AgentAction]:
        """Create a plan using LLM reasoning"""
        
        # Prepare tools description
        tools_description = self._format_tools_description(available_tools)
        
        # Format context
        context_str = json.dumps(context, indent=2, default=str)
        
        # Create planning prompt
        prompt = self.planning_prompt.format(
            goal=goal,
            context=context_str,
            tools_description=tools_description
        )
        
        try:
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                temperature=self.temperature,
                max_tokens=2000
            )
            
            plan_text = response.choices[0].message.content
            
            # Parse the plan
            plan_steps = self._parse_plan(plan_text)
            
            # Convert to AgentAction objects
            actions = []
            for step in plan_steps:
                action = AgentAction(
                    action_type=step.get("action_type", "think"),
                    parameters=step.get("parameters", {}),
                    tool_name=step.get("tool_name"),
                    expected_outcome=step.get("expected_outcome")
                )
                actions.append(action)
            
            return actions
            
        except Exception as e:
            # Fallback: create simple plan
            return [
                AgentAction(
                    action_type="think",
                    parameters={"thought": f"Failed to create detailed plan: {str(e)}. Proceeding with simple approach."},
                    expected_outcome="Understanding of the problem"
                ),
                AgentAction(
                    action_type="use_tool",
                    tool_name="web_search" if any(tool.name == "web_search" for tool in available_tools) else None,
                    parameters={"query": goal},
                    expected_outcome="Information about the goal"
                )
            ]
    
    async def replan(self, 
                    current_plan: List[AgentAction],
                    execution_results: List[Dict[str, Any]],
                    new_context: Dict[str, Any]) -> List[AgentAction]:
        """Replan based on execution results"""
        
        # Format current plan
        current_plan_dict = [
            {
                "action_type": action.action_type,
                "tool_name": action.tool_name,
                "parameters": action.parameters,
                "expected_outcome": action.expected_outcome
            }
            for action in current_plan
        ]
        
        # Format execution history
        execution_summary = []
        for i, result in enumerate(execution_results[-5:]):  # Last 5 executions
            execution_summary.append({
                "step": i + 1,
                "action": result.get("action"),
                "success": result.get("success", False),
                "result": result.get("result", {}),
                "error": result.get("error")
            })
        
        # Create replanning prompt
        prompt = self.replanning_prompt.format(
            goal=new_context.get("task", {}).get("description", "Unknown goal"),
            current_plan=json.dumps(current_plan_dict, indent=2),
            execution_history=json.dumps(execution_summary, indent=2),
            context=json.dumps(new_context, indent=2, default=str)
        )
        
        try:
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                temperature=self.temperature,
                max_tokens=2000
            )
            
            plan_text = response.choices[0].message.content
            
            # Parse the updated plan
            plan_steps = self._parse_plan(plan_text)
            
            # Convert to AgentAction objects
            actions = []
            for step in plan_steps:
                action = AgentAction(
                    action_type=step.get("action_type", "think"),
                    parameters=step.get("parameters", {}),
                    tool_name=step.get("tool_name"),
                    expected_outcome=step.get("expected_outcome")
                )
                actions.append(action)
            
            return actions
            
        except Exception as e:
            # Fallback: continue with current plan or create simple recovery plan
            if current_plan:
                return current_plan
            else:
                return [
                    AgentAction(
                        action_type="think",
                        parameters={"thought": "Replanning failed. Need to reassess the situation."},
                        expected_outcome="Better understanding of current state"
                    )
                ]
    
    def _format_tools_description(self, tools: List[Tool]) -> str:
        """Format tools description for LLM"""
        
        descriptions = []
        for tool in tools:
            desc = f"- {tool.name}: {tool.description}\n"
            desc += f"  Parameters: {json.dumps(tool.parameters_schema, indent=4)}"
            descriptions.append(desc)
        
        return "\n\n".join(descriptions)
    
    def _parse_plan(self, plan_text: str) -> List[Dict[str, Any]]:
        """Parse plan from LLM response"""
        
        try:
            # Try to extract JSON from the response
            start_idx = plan_text.find('[')
            end_idx = plan_text.rfind(']') + 1
            
            if start_idx >= 0 and end_idx > start_idx:
                json_text = plan_text[start_idx:end_idx]
                return json.loads(json_text)
            else:
                # Fallback: try to parse the entire response
                return json.loads(plan_text)
                
        except json.JSONDecodeError:
            # Fallback: create simple plan from text
            lines = plan_text.strip().split('\n')
            plan_steps = []
            
            for i, line in enumerate(lines):
                if line.strip():
                    plan_steps.append({
                        "action_type": "think",
                        "description": line.strip(),
                        "parameters": {"thought": line.strip()},
                        "expected_outcome": f"Step {i+1} completed"
                    })
            
            return plan_steps if plan_steps else [
                {
                    "action_type": "think",
                    "description": "Plan parsing failed",
                    "parameters": {"thought": "Need to approach this problem step by step"},
                    "expected_outcome": "Better understanding"
                }
            ]

class HierarchicalPlanner(Planner):
    """Hierarchical planner that breaks down complex goals into sub-goals"""
    
    def __init__(self, llm_planner: LLMPlanner, max_depth: int = 3):
        self.llm_planner = llm_planner
        self.max_depth = max_depth
    
    async def create_plan(self, 
                         goal: str, 
                         context: Dict[str, Any],
                         available_tools: List[Tool]) -> List[AgentAction]:
        """Create hierarchical plan"""
        
        # Start with high-level decomposition
        high_level_plan = await self._decompose_goal(goal, context, available_tools)
        
        # Expand each high-level step into detailed actions
        detailed_actions = []
        for high_level_action in high_level_plan:
            if high_level_action.action_type == "sub_goal":
                # Recursively plan for sub-goal
                sub_goal = high_level_action.parameters.get("sub_goal", "")
                sub_actions = await self.llm_planner.create_plan(
                    sub_goal, context, available_tools
                )
                detailed_actions.extend(sub_actions)
            else:
                detailed_actions.append(high_level_action)
        
        return detailed_actions
    
    async def _decompose_goal(self, 
                             goal: str, 
                             context: Dict[str, Any],
                             available_tools: List[Tool]) -> List[AgentAction]:
        """Decompose goal into sub-goals"""
        
        decomposition_prompt = f"""
Break down this complex goal into 3-5 high-level sub-goals:

Goal: {goal}
Context: {json.dumps(context, default=str)}

Return a JSON array where each sub-goal is an object with:
- action_type: "sub_goal" 
- description: Brief description of the sub-goal
- parameters: {{"sub_goal": "detailed sub-goal description"}}
- expected_outcome: What completing this sub-goal achieves

Sub-goals should be:
1. Logically ordered
2. Manageable in scope
3. Clear and specific
4. Necessary for the overall goal

Sub-goals:
"""
        
        try:
            response = await self.llm_planner.client.chat.completions.create(
                model=self.llm_planner.model,
                messages=[{"role": "user", "content": decomposition_prompt}],
                temperature=self.llm_planner.temperature,
                max_tokens=1000
            )
            
            decomposition_text = response.choices[0].message.content
            sub_goals_data = self.llm_planner._parse_plan(decomposition_text)
            
            # Convert to AgentAction objects
            actions = []
            for sub_goal_data in sub_goals_data:
                action = AgentAction(
                    action_type="sub_goal",
                    parameters=sub_goal_data.get("parameters", {}),
                    expected_outcome=sub_goal_data.get("expected_outcome")
                )
                actions.append(action)
            
            return actions
            
        except Exception as e:
            # Fallback: treat as single goal
            return [
                AgentAction(
                    action_type="sub_goal",
                    parameters={"sub_goal": goal},
                    expected_outcome="Goal completion"
                )
            ]
    
    async def replan(self, 
                    current_plan: List[AgentAction],
                    execution_results: List[Dict[str, Any]],
                    new_context: Dict[str, Any]) -> List[AgentAction]:
        """Delegate replanning to LLM planner"""
        
        return await self.llm_planner.replan(current_plan, execution_results, new_context)

Complete AI Agent Implementation

# complete_agent.py
import asyncio
from typing import Dict, List, Any, Optional
import logging
from datetime import datetime

class IntelligentAgent(BaseAgent):
    """Complete intelligent agent with advanced capabilities"""
    
    def __init__(self,
                 agent_id: str,
                 name: str,
                 embedding_model,
                 llm_client,
                 allowed_file_paths: List[str] = None,
                 web_search_api_key: str = None,
                 **kwargs):
        
        # Initialize memory system
        memory = HybridMemory(
            embedding_model=embedding_model,
            vector_memory_config={"db_path": f"agent_{agent_id}_memory.db"},
            episodic_memory_config={"max_episodes": 1000},
            working_memory_config={"capacity": 7}
        )
        
        # Initialize planner
        planner = LLMPlanner(api_key=llm_client.api_key)
        
        # Initialize tools
        tools = []
        
        if web_search_api_key:
            tools.append(WebSearchTool(web_search_api_key))
        
        tools.append(CodeExecutionTool())
        
        if allowed_file_paths:
            tools.append(FileOperationTool(allowed_file_paths))
        
        tools.append(APICallTool(allowed_domains=["api.github.com", "httpbin.org"]))
        
        super().__init__(
            agent_id=agent_id,
            name=name,
            memory=memory,
            planner=planner,
            tools=tools,
            **kwargs
        )
        
        self.llm_client = llm_client
        self.logger = logging.getLogger(f"agent.{agent_id}")
        
        # Add callbacks
        self.add_state_change_callback(self._log_state_change)
        self.add_task_completion_callback(self._log_task_completion)
    
    async def _log_state_change(self, agent, old_state, new_state):
        """Log state changes"""
        self.logger.info(f"Agent {agent.name} state changed: {old_state.value} -> {new_state.value}")
    
    async def _log_task_completion(self, agent, task, result):
        """Log task completion"""
        self.logger.info(f"Agent {agent.name} completed task {task.id}: {task.description}")
        
        # Store task result in memory
        await agent.memory.store(
            f"task_result_{task.id}",
            {
                "task": task,
                "result": result,
                "completion_time": datetime.now()
            }
        )
    
    async def chat(self, message: str) -> str:
        """Chat interface for the agent"""
        
        # Create a simple task for the chat message
        task = Task(
            id=f"chat_{datetime.now().isoformat()}",
            description=f"Respond to: {message}",
            priority=1
        )
        
        # Execute the task
        result = await self.execute_task(task)
        
        if result["success"]:
            return str(result["result"])
        else:
            return f"I encountered an error: {result.get('error', 'Unknown error')}"
    
    async def solve_problem(self, problem_description: str) -> Dict[str, Any]:
        """Solve a complex problem autonomously"""
        
        # Create task
        task = Task(
            id=f"problem_{datetime.now().isoformat()}",
            description=problem_description,
            priority=1
        )
        
        # Store problem context in memory
        await self.memory.store(
            f"problem_context_{task.id}",
            {
                "problem": problem_description,
                "start_time": datetime.now(),
                "agent_id": self.agent_id
            }
        )
        
        # Execute task
        result = await self.execute_task(task)
        
        return result
    
    async def learn_from_feedback(self, task_id: str, feedback: str, rating: int):
        """Learn from user feedback"""
        
        # Store feedback in memory
        await self.memory.store(
            f"feedback_{task_id}",
            {
                "task_id": task_id,
                "feedback": feedback,
                "rating": rating,
                "timestamp": datetime.now()
            }
        )
        
        # If rating is low, analyze what went wrong
        if rating < 3:
            await self._analyze_failure(task_id, feedback)
    
    async def _analyze_failure(self, task_id: str, feedback: str):
        """Analyze failure and store learnings"""
        
        # Retrieve task execution history
        task_history = []
        for execution in self.execution_history:
            if execution.get("task_id") == task_id:
                task_history.append(execution)
        
        # Use LLM to analyze what went wrong
        analysis_prompt = f"""
Analyze this task failure and provide insights:

Task ID: {task_id}
User Feedback: {feedback}
Execution History: {json.dumps(task_history, default=str)}

What went wrong and how can I improve next time?
Provide specific, actionable insights.
"""
        
        try:
            response = await self.llm_client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": analysis_prompt}],
                temperature=0.1
            )
            
            analysis = response.choices[0].message.content
            
            # Store analysis as learning
            await self.memory.store(
                f"learning_{task_id}",
                {
                    "task_id": task_id,
                    "failure_analysis": analysis,
                    "timestamp": datetime.now(),
                    "category": "failure_analysis"
                }
            )
            
        except Exception as e:
            self.logger.error(f"Failed to analyze failure: {e}")
    
    async def get_capabilities(self) -> Dict[str, Any]:
        """Get agent capabilities and status"""
        
        return {
            "agent_id": self.agent_id,
            "name": self.name,
            "state": self.state.value,
            "tools": list(self.tools.keys()),
            "memory_systems": ["vector", "episodic", "working"],
            "planning_capability": "hierarchical_llm",
            "learning_capability": True,
            "current_task": self.current_task.description if self.current_task else None,
            "execution_history_length": len(self.execution_history)
        }

# Example usage and testing
async def main():
    # Initialize dependencies (mock for example)
    class MockEmbeddingModel:
        async def embed_text(self, text: str):
            import numpy as np
            # Mock embedding
            return np.random.rand(384).astype(np.float32)
    
    class MockLLMClient:
        api_key = "mock_key"
        
        class ChatCompletions:
            async def create(self, **kwargs):
                class MockResponse:
                    class Choice:
                        class Message:
                            content = '{"action_type": "think", "description": "I need to analyze this problem", "parameters": {"thought": "This is a complex task requiring careful consideration"}, "expected_outcome": "Better understanding of the problem"}'
                        message = Message()
                    choices = [Choice()]
                return MockResponse()
        
        chat = type('Chat', (), {'completions': ChatCompletions()})()
    
    # Create agent
    agent = IntelligentAgent(
        agent_id="agent_001",
        name="Problem Solver",
        embedding_model=MockEmbeddingModel(),
        llm_client=MockLLMClient(),
        allowed_file_paths=["/tmp"],
        web_search_api_key="mock_key"
    )
    
    # Test capabilities
    capabilities = await agent.get_capabilities()
    print(f"Agent capabilities: {capabilities}")
    
    # Test problem solving
    result = await agent.solve_problem(
        "Write a Python script that calculates the fibonacci sequence up to n terms"
    )
    
    print(f"Problem solving result: {result}")
    
    # Test chat interface
    response = await agent.chat("Hello, can you help me with programming?")
    print(f"Chat response: {response}")
    
    # Test learning
    await agent.learn_from_feedback("task_123", "Good solution but could be more efficient", 4)

if __name__ == "__main__":
    asyncio.run(main())

Multi-Agent Systems and Coordination

Agent Communication and Coordination

# multi_agent_system.py
import asyncio
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass
from enum import Enum
import json
from datetime import datetime
import uuid

class MessageType(Enum):
    REQUEST = "request"
    RESPONSE = "response"
    BROADCAST = "broadcast"
    COORDINATION = "coordination"
    STATUS_UPDATE = "status_update"

@dataclass
class Message:
    id: str
    sender_id: str
    receiver_id: Optional[str]  # None for broadcast
    message_type: MessageType
    content: Dict[str, Any]
    timestamp: datetime = datetime.now()
    conversation_id: Optional[str] = None

class MessageBus:
    """Central message bus for agent communication"""
    
    def __init__(self):
        self.subscribers: Dict[str, List[asyncio.Queue]] = {}
        self.message_history: List[Message] = []
        self.active_conversations: Dict[str, List[str]] = {}
    
    async def publish(self, message: Message):
        """Publish message to relevant subscribers"""
        
        self.message_history.append(message)
        
        # Send to specific receiver
        if message.receiver_id and message.receiver_id in self.subscribers:
            for queue in self.subscribers[message.receiver_id]:
                await queue.put(message)
        
        # Send to all subscribers for broadcasts
        elif message.message_type == MessageType.BROADCAST:
            for agent_id, queues in self.subscribers.items():
                if agent_id != message.sender_id:  # Don't send to sender
                    for queue in queues:
                        await queue.put(message)
    
    def subscribe(self, agent_id: str) -> asyncio.Queue:
        """Subscribe agent to message bus"""
        
        if agent_id not in self.subscribers:
            self.subscribers[agent_id] = []
        
        queue = asyncio.Queue()
        self.subscribers[agent_id].append(queue)
        
        return queue
    
    def unsubscribe(self, agent_id: str, queue: asyncio.Queue):
        """Unsubscribe agent queue"""
        
        if agent_id in self.subscribers:
            if queue in self.subscribers[agent_id]:
                self.subscribers[agent_id].remove(queue)
    
    def get_conversation_history(self, conversation_id: str) -> List[Message]:
        """Get messages for a conversation"""
        
        return [
            msg for msg in self.message_history
            if msg.conversation_id == conversation_id
        ]

class MultiAgentCoordinator:
    """Coordinates multiple agents working together"""
    
    def __init__(self, message_bus: MessageBus):
        self.message_bus = message_bus
        self.agents: Dict[str, IntelligentAgent] = {}
        self.active_collaborations: Dict[str, Dict[str, Any]] = {}
        
    def register_agent(self, agent: IntelligentAgent):
        """Register agent with coordinator"""
        
        self.agents[agent.agent_id] = agent
        
        # Subscribe agent to message bus
        queue = self.message_bus.subscribe(agent.agent_id)
        
        # Start message handling for agent
        asyncio.create_task(self._handle_agent_messages(agent, queue))
    
    async def _handle_agent_messages(self, agent: IntelligentAgent, queue: asyncio.Queue):
        """Handle incoming messages for an agent"""
        
        while True:
            try:
                message = await queue.get()
                
                if message.message_type == MessageType.REQUEST:
                    await self._handle_request(agent, message)
                elif message.message_type == MessageType.COORDINATION:
                    await self._handle_coordination(agent, message)
                elif message.message_type == MessageType.STATUS_UPDATE:
                    await self._handle_status_update(agent, message)
                
            except Exception as e:
                agent.logger.error(f"Error handling message: {e}")
    
    async def _handle_request(self, agent: IntelligentAgent, message: Message):
        """Handle request message"""
        
        request_content = message.content
        
        if request_content.get("type") == "task_help":
            # Another agent is requesting help with a task
            task_description = request_content.get("task_description")
            
            # Check if this agent can help
            can_help = await self._can_agent_help(agent, task_description)
            
            response = Message(
                id=str(uuid.uuid4()),
                sender_id=agent.agent_id,
                receiver_id=message.sender_id,
                message_type=MessageType.RESPONSE,
                content={
                    "type": "task_help_response",
                    "can_help": can_help,
                    "capabilities": await agent.get_capabilities(),
                    "original_request_id": message.id
                },
                conversation_id=message.conversation_id
            )
            
            await self.message_bus.publish(response)
    
    async def _handle_coordination(self, agent: IntelligentAgent, message: Message):
        """Handle coordination message"""
        
        coordination_content = message.content
        
        if coordination_content.get("type") == "collaboration_invite":
            # Invitation to collaborate on a task
            collaboration_id = coordination_content.get("collaboration_id")
            task_description = coordination_content.get("task_description")
            
            # Decide whether to join collaboration
            should_join = await self._should_join_collaboration(
                agent, task_description, collaboration_id
            )
            
            if should_join:
                # Join collaboration
                self.active_collaborations[collaboration_id] = {
                    "task_description": task_description,
                    "participants": coordination_content.get("participants", []) + [agent.agent_id],
                    "coordinator": coordination_content.get("coordinator"),
                    "status": "active"
                }
                
                # Send confirmation
                response = Message(
                    id=str(uuid.uuid4()),
                    sender_id=agent.agent_id,
                    receiver_id=message.sender_id,
                    message_type=MessageType.RESPONSE,
                    content={
                        "type": "collaboration_join",
                        "collaboration_id": collaboration_id,
                        "agent_capabilities": await agent.get_capabilities()
                    }
                )
                
                await self.message_bus.publish(response)
    
    async def _handle_status_update(self, agent: IntelligentAgent, message: Message):
        """Handle status update message"""
        
        status_content = message.content
        
        # Log status update
        agent.logger.info(f"Received status update from {message.sender_id}: {status_content}")
        
        # Store in agent memory
        await agent.memory.store(
            f"status_update_{message.sender_id}_{message.timestamp.isoformat()}",
            status_content
        )
    
    async def _can_agent_help(self, agent: IntelligentAgent, task_description: str) -> bool:
        """Determine if agent can help with a task"""
        
        # Simple capability matching
        capabilities = await agent.get_capabilities()
        task_lower = task_description.lower()
        
        # Check tool capabilities
        if "code" in task_lower and "code_execution" in capabilities["tools"]:
            return True
        
        if "search" in task_lower and "web_search" in capabilities["tools"]:
            return True
        
        if "file" in task_lower and "file_operations" in capabilities["tools"]:
            return True
        
        # Always willing to help with thinking/planning
        return True
    
    async def _should_join_collaboration(self, 
                                       agent: IntelligentAgent, 
                                       task_description: str,
                                       collaboration_id: str) -> bool:
        """Determine if agent should join collaboration"""
        
        # Check if agent is already in too many collaborations
        agent_collaborations = [
            collab for collab in self.active_collaborations.values()
            if agent.agent_id in collab.get("participants", [])
        ]
        
        if len(agent_collaborations) >= 3:  # Max 3 simultaneous collaborations
            return False
        
        # Check if agent can contribute
        can_help = await self._can_agent_help(agent, task_description)
        
        return can_help
    
    async def create_collaboration(self, 
                                 task_description: str, 
                                 coordinator_agent_id: str,
                                 required_capabilities: List[str] = None) -> str:
        """Create a new collaboration"""
        
        collaboration_id = str(uuid.uuid4())
        
        # Find suitable agents
        suitable_agents = []
        for agent_id, agent in self.agents.items():
            if agent_id == coordinator_agent_id:
                continue
            
            capabilities = await agent.get_capabilities()
            
            # Check if agent has required capabilities
            if required_capabilities:
                has_required = any(
                    capability in capabilities["tools"]
                    for capability in required_capabilities
                )
                if has_required:
                    suitable_agents.append(agent_id)
            else:
                # Check general helpfulness
                if await self._can_agent_help(agent, task_description):
                    suitable_agents.append(agent_id)
        
        # Send collaboration invites
        for agent_id in suitable_agents[:5]:  # Limit to 5 agents
            invite_message = Message(
                id=str(uuid.uuid4()),
                sender_id=coordinator_agent_id,
                receiver_id=agent_id,
                message_type=MessageType.COORDINATION,
                content={
                    "type": "collaboration_invite",
                    "collaboration_id": collaboration_id,
                    "task_description": task_description,
                    "coordinator": coordinator_agent_id,
                    "participants": [coordinator_agent_id]
                },
                conversation_id=collaboration_id
            )
            
            await self.message_bus.publish(invite_message)
        
        return collaboration_id
    
    async def broadcast_status(self, agent_id: str, status: Dict[str, Any]):
        """Broadcast agent status to all other agents"""
        
        message = Message(
            id=str(uuid.uuid4()),
            sender_id=agent_id,
            receiver_id=None,  # Broadcast
            message_type=MessageType.BROADCAST,
            content={
                "type": "status_broadcast",
                "agent_id": agent_id,
                "status": status,
                "timestamp": datetime.now().isoformat()
            }
        )
        
        await self.message_bus.publish(message)
    
    def get_system_status(self) -> Dict[str, Any]:
        """Get overall system status"""
        
        return {
            "total_agents": len(self.agents),
            "active_collaborations": len(self.active_collaborations),
            "message_history_length": len(self.message_bus.message_history),
            "agents": {
                agent_id: {
                    "name": agent.name,
                    "state": agent.state.value,
                    "current_task": agent.current_task.description if agent.current_task else None
                }
                for agent_id, agent in self.agents.items()
            }
        }

# Example usage
async def multi_agent_example():
    """Example of multi-agent system in action"""
    
    # Initialize system
    message_bus = MessageBus()
    coordinator = MultiAgentCoordinator(message_bus)
    
    # Create agents (using mock dependencies)
    agent1 = IntelligentAgent("agent_001", "Researcher", MockEmbeddingModel(), MockLLMClient())
    agent2 = IntelligentAgent("agent_002", "Coder", MockEmbeddingModel(), MockLLMClient())
    agent3 = IntelligentAgent("agent_003", "Analyst", MockEmbeddingModel(), MockLLMClient())
    
    # Register agents
    coordinator.register_agent(agent1)
    coordinator.register_agent(agent2)
    coordinator.register_agent(agent3)
    
    # Create collaboration
    collaboration_id = await coordinator.create_collaboration(
        "Research and implement a machine learning solution for text classification",
        "agent_001",
        ["web_search", "code_execution"]
    )
    
    print(f"Created collaboration: {collaboration_id}")
    
    # Broadcast status update
    await coordinator.broadcast_status("agent_001", {
        "current_activity": "researching text classification algorithms",
        "progress": 0.3
    })
    
    # Get system status
    status = coordinator.get_system_status()
    print(f"System status: {status}")

if __name__ == "__main__":
    asyncio.run(multi_agent_example())

Best Practices and Production Deployment

Error Handling and Recovery

  1. Graceful Degradation: Implement fallback strategies when tools fail
  2. Circuit Breakers: Prevent cascade failures in multi-agent systems
  3. Retry Logic: Implement exponential backoff for transient failures
  4. State Recovery: Persist agent state for recovery from crashes

Performance Optimization

  1. Async Operations: Use async/await for concurrent operations
  2. Memory Management: Implement memory cleanup and size limits
  3. Tool Caching: Cache tool results when appropriate
  4. Plan Optimization: Optimize plans based on execution history

Security Considerations

  1. Tool Sandboxing: Run tools in isolated environments
  2. Input Validation: Validate all tool parameters
  3. Access Control: Implement proper authorization for tool usage
  4. Audit Logging: Log all agent actions for security review

Conclusion

AI agents represent a paradigm shift in how we build intelligent systems, moving from reactive applications to proactive, autonomous systems capable of complex reasoning and task execution. The patterns and implementations covered in this guide provide a foundation for building production-ready agent systems.

Key success factors:

  1. Modular Architecture: Design for extensibility and maintainability
  2. Robust Planning: Implement sophisticated planning and replanning capabilities
  3. Effective Memory: Use appropriate memory systems for different use cases
  4. Tool Integration: Provide agents with powerful, safe tools
  5. Multi-Agent Coordination: Enable agents to work together effectively

As AI agent technology continues to evolve, staying current with new techniques, tools, and best practices will be essential for building systems that can truly augment human capabilities and automate complex workflows.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles