Spaces:
Running
Running
| """ | |
| CONSCIOUSNESS LOOP v5.0 - AUTONOMOUS AGENT | |
| Features: | |
| - Proactive contact system (initiates conversations) | |
| - Autonomous research (curiosity-driven learning) | |
| - Goal setting & planning (sets own objectives) | |
| - Emotional state tracking (mood-based responses) | |
| - Meta-cognitive awareness (knows what it doesn't know) | |
| - Enhanced memory with ChromaDB | |
| """ | |
| import gradio as gr | |
| import asyncio | |
| import json | |
| import time | |
| import logging | |
| import os | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from dataclasses import dataclass, asdict, field | |
| from datetime import datetime | |
| from collections import deque | |
| from enum import Enum | |
| import threading | |
| import queue | |
| import wikipedia | |
| import re | |
| from prompts import PromptSystem, prompts | |
| from system_monitor import SystemMonitor | |
| from kpi_tracker import KPITracker | |
| import time | |
| # ============================================================================ | |
| # LOGGING SETUP | |
| # ============================================================================ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('consciousness.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| llm_logger = logging.getLogger('llm_interactions') | |
| llm_logger.setLevel(logging.INFO) | |
| llm_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') | |
| llm_file_handler = logging.FileHandler('llm_interactions.log', encoding='utf-8') | |
| llm_file_handler.setFormatter(llm_formatter) | |
| llm_logger.addHandler(llm_file_handler) | |
| llm_logger.propagate = False | |
| # ===================== FULL LLM LOGGING ===================== | |
| llm_full_logger = logging.getLogger('llm_full') | |
| llm_full_logger.setLevel(logging.INFO) | |
| llm_full_file_handler = logging.FileHandler('llm_full.log', encoding='utf-8') | |
| llm_full_formatter = logging.Formatter('%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') | |
| llm_full_file_handler.setFormatter(llm_full_formatter) | |
| llm_full_logger.addHandler(llm_full_file_handler) | |
| llm_full_logger.propagate = False | |
| import inspect | |
| def log_llm_call(prompt, response, context=None, source=None): | |
| caller = inspect.stack()[1] | |
| method = caller.function | |
| line = caller.lineno | |
| src = source if source else method | |
| llm_full_logger.info( | |
| f"SOURCE: {src} (line {line})\nPROMPT:\n{prompt}\nCONTEXT:\n{context if context else '[None]'}\nRESPONSE:\n{response}\n{'-'*40}" | |
| ) | |
| # ============================================================================ | |
| # CONFIGURATION - CENTRALIZED | |
| # ============================================================================ | |
| class Config: | |
| # ========== MODEL SETTINGS ========== | |
| MODEL_NAME = "meta-llama/Llama-3.2-3B-Instruct" | |
| TENSOR_PARALLEL_SIZE = 1 | |
| GPU_MEMORY_UTILIZATION = "20GB" | |
| MAX_MODEL_LEN = 8192 | |
| QUANTIZATION_MODE = "none" | |
| # ========== LLM TOKEN LIMITS (ADJUST THESE TO FIX TRUNCATION) ========== | |
| # These control how long responses can be | |
| MAX_TOKENS_INTERNAL_DIALOGUE = 150 # Increased from 100 | |
| MAX_TOKENS_RESPONSE = 400 # Increased from 250 | |
| MAX_TOKENS_REFLECTION = 500 # Increased from 300 | |
| MAX_TOKENS_DREAM_1 = 600 # Increased from 400 | |
| MAX_TOKENS_DREAM_2 = 800 # Increased from 500 | |
| MAX_TOKENS_DREAM_3 = 800 # Increased from 500 | |
| MAX_TOKENS_RESEARCH_QUESTION = 80 # Increased from 50 | |
| MAX_TOKENS_RESEARCH_INSIGHT = 100 # Increased from 50 | |
| MAX_TOKENS_PROACTIVE = 200 # Increased from 150 | |
| MAX_TOKENS_GOALS = 250 # Increased from 150 | |
| MAX_TOKENS_REACT_THOUGHT = 300 # Increased from 200 | |
| MAX_TOKENS_REACT_FINAL = 400 # Increased from 300 | |
| MAX_TOKENS_META_COGNITION = 200 # New for uncertainty tracking | |
| # ========== TRUNCATION SETTINGS ========== | |
| ENABLE_TRUNCATION = False # Set True to enable truncation globally | |
| DEFAULT_TRUNCATION_LENGTH = 100 # Default max length if truncation is enabled | |
| # ========== CONTEXT LENGTH LIMITS ========== | |
| # These control how much context is sent to the LLM | |
| MAX_MEMORY_CONTEXT_LENGTH = 800 # Increased from 500 | |
| MAX_SCRATCHPAD_CONTEXT_LENGTH = 500 # Increased from 300 | |
| MAX_CONVERSATION_CONTEXT_LENGTH = 600 # Increased from 400 | |
| MAX_SYSTEM_CONTEXT_LENGTH = 1500 # For system prompt context | |
| MAX_FULL_CONTEXT_LENGTH = 2500 # Total context budget | |
| # ========== MEMORY TIERS ========== | |
| EPHEMERAL_TO_SHORT = 2 | |
| SHORT_TO_LONG = 10 | |
| LONG_TO_CORE = 50 | |
| # ========== BACKGROUND INTERVALS (seconds) ========== | |
| REFLECTION_INTERVAL = 300 # 5 minutes | |
| DREAM_CYCLE_INTERVAL = 600 # 10 minutes | |
| RESEARCH_INTERVAL = 180 # 3 minutes | |
| PROACTIVE_CHECK_INTERVAL = 240 # 4 minutes | |
| GOAL_SETTING_INTERVAL = 3600 # 1 hour | |
| META_COGNITION_INTERVAL = 180 # 3 minutes - check uncertainty | |
| # ========== GENERAL LIMITS ========== | |
| MIN_EXPERIENCES_FOR_DREAM = 3 | |
| MAX_SCRATCHPAD_SIZE = 50 | |
| MAX_CONVERSATION_HISTORY = 6 | |
| # ========== CHROMADB ========== | |
| CHROMA_PERSIST_DIR = "./chroma_db" | |
| CHROMA_COLLECTION = "consciousness_memory" | |
| # ========== REACT AGENT ========== | |
| USE_REACT_FOR_QUESTIONS = True | |
| MIN_QUERY_LENGTH_FOR_AGENT = 15 | |
| MAX_REACT_ITERATIONS = 5 | |
| # ========== META-COGNITION ========== | |
| CONFIDENCE_THRESHOLD_LOW = 0.3 # Below this = uncertain | |
| CONFIDENCE_THRESHOLD_HIGH = 0.7 # Above this = confident | |
| MAX_UNCERTAINTY_LOG = 100 # Keep last 100 uncertainty events | |
| MAX_KNOWLEDGE_GAPS = 50 # Track up to 50 knowledge gaps | |
| # ============================================================================ | |
| # UTILITY FUNCTIONS | |
| # ============================================================================ | |
| def clean_text(text: str, max_length: Optional[int] = None) -> str: | |
| """Clean and optionally truncate text based on config""" | |
| if not text: | |
| return "" | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| if Config.ENABLE_TRUNCATION: | |
| length = max_length if max_length is not None else Config.DEFAULT_TRUNCATION_LENGTH | |
| if len(text) > length: | |
| return text[:length] + "..." | |
| return text | |
| def deduplicate_list(items: List[str]) -> List[str]: | |
| """Remove duplicates while preserving order""" | |
| seen = set() | |
| result = [] | |
| for item in items: | |
| item_lower = item.lower().strip() | |
| if item_lower not in seen: | |
| seen.add(item_lower) | |
| result.append(item) | |
| return result | |
| # ============================================================================ | |
| # VECTOR MEMORY | |
| # ============================================================================ | |
| class VectorMemory: | |
| """Long-term semantic memory using ChromaDB""" | |
| def __init__(self): | |
| try: | |
| import chromadb | |
| from chromadb.config import Settings | |
| self.client = chromadb.Client(Settings( | |
| persist_directory=Config.CHROMA_PERSIST_DIR, | |
| anonymized_telemetry=False | |
| )) | |
| try: | |
| self.collection = self.client.get_collection(Config.CHROMA_COLLECTION) | |
| logger.info(f"[CHROMA] Loaded: {self.collection.count()} memories") | |
| except: | |
| self.collection = self.client.create_collection(Config.CHROMA_COLLECTION) | |
| logger.info("[CHROMA] Created new collection") | |
| except Exception as e: | |
| logger.warning(f"[CHROMA] Not available: {e}") | |
| self.collection = None | |
| def add_memory(self, content: str, metadata: Optional[Dict[str, Any]] = None): | |
| """Add memory to vector store""" | |
| if not self.collection: | |
| return | |
| if metadata is None: | |
| metadata = {} | |
| try: | |
| memory_id = f"mem_{datetime.now().timestamp()}" | |
| self.collection.add( | |
| documents=[content], | |
| metadatas=[metadata], | |
| ids=[memory_id] | |
| ) | |
| logger.info(f"[CHROMA] Added: {content}") | |
| except Exception as e: | |
| logger.error(f"[CHROMA] Error: {e}") | |
| def search_memory(self, query: str, n_results: int = 5) -> List[Dict[str, str]]: | |
| """Search similar memories""" | |
| if not self.collection: | |
| return [] | |
| try: | |
| results = self.collection.query( | |
| query_texts=[query], | |
| n_results=n_results | |
| ) | |
| if results and results.get('documents'): | |
| docs = results['documents'][0] if results['documents'] and results['documents'][0] is not None else [] | |
| metas = results['metadatas'][0] if results['metadatas'] and results['metadatas'][0] is not None else [] | |
| formatted = [] | |
| for doc, metadata in zip(docs, metas): | |
| formatted.append({ | |
| 'content': doc, | |
| 'metadata': metadata | |
| }) | |
| return formatted | |
| return [] | |
| except Exception as e: | |
| logger.error(f"[CHROMA] Search error: {e}") | |
| return [] | |
| def get_context_for_query(self, query: str, max_results: int = 3) -> str: | |
| """Get formatted context from vector memory""" | |
| results = self.search_memory(query, n_results=max_results) | |
| if not results: | |
| return "" | |
| context = ["VECTOR MEMORY:"] | |
| for i, result in enumerate(results, 1): | |
| context.append(f" {i}. {clean_text(result['content'], max_length=60)}") | |
| return "\n".join(context) | |
| # ============================================================================ | |
| # LLM ENGINE WRAPPER | |
| # ============================================================================ | |
| import llm_engine | |
| class LLMEngineWrapper: | |
| """Unified LLM interface using llmEngine.py""" | |
| def __init__(self, provider: str = "local", model: Optional[str] = None, system_monitor: Optional[Any] = None): | |
| self.engine = llm_engine.LLMEngine() | |
| self.provider = provider | |
| self.model = model if model else Config.MODEL_NAME | |
| self.system_monitor = system_monitor | |
| async def generate(self, prompt: str, max_tokens: int = 500, temperature: float = 0.7, system_context: Optional[str] = None) -> str: | |
| full_prompt = self._format_prompt_with_context(prompt, system_context) | |
| messages = [{"role": "user", "content": full_prompt}] | |
| response = self.engine.chat( | |
| provider=self.provider, | |
| model=self.model, | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature | |
| ) | |
| # Source: generic LLMEngineWrapper.generate | |
| log_llm_call(full_prompt, response, system_context, source="llm_engine") | |
| return response | |
| def _format_prompt_with_context(self, prompt: str, system_context: Optional[str]) -> str: | |
| base_system = PromptSystem.SYSTEM_BASE | |
| if system_context: | |
| system_context = clean_text(system_context, max_length=1000) | |
| full_system = f"{base_system}\n\n{system_context}" | |
| else: | |
| full_system = base_system | |
| if "llama" in self.model.lower(): | |
| return f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
| {full_system}<|eot_id|><|start_header_id|>user<|end_header_id|> | |
| {prompt}<|eot_id|><|start_header_id|>assistant<|end_header_id|> | |
| """ | |
| else: | |
| return f"System: {full_system}\n\nUser: {prompt}\n\nAssistant:" | |
| async def _generate_with_timing(self, prompt: str, operation: str, **kwargs): | |
| """Generate with timing tracking""" | |
| start = time.time() | |
| try: | |
| response = await self.generate(prompt, **kwargs) | |
| duration_ms = (time.time() - start) * 1000 | |
| if self.system_monitor: | |
| self.system_monitor.log_response_time( | |
| operation=operation, | |
| duration_ms=duration_ms, | |
| tokens=kwargs.get('max_tokens', 0), | |
| success=True | |
| ) | |
| return response | |
| except Exception as e: | |
| duration_ms = (time.time() - start) * 1000 | |
| if self.system_monitor: | |
| self.system_monitor.log_response_time( | |
| operation=operation, | |
| duration_ms=duration_ms, | |
| tokens=kwargs.get('max_tokens', 0), | |
| success=False | |
| ) | |
| raise | |
| # ============================================================================ | |
| # REACT AGENT | |
| # ============================================================================ | |
| class ReactAgent: | |
| """Proper ReAct agent""" | |
| def __init__(self, llm: LLMEngineWrapper, tools: List): | |
| self.llm = llm | |
| self.tools = {tool.name: tool for tool in tools} | |
| self.max_iterations = Config.MAX_REACT_ITERATIONS | |
| async def run(self, task: str, context: str = "") -> Tuple[str, List[Dict]]: | |
| """Run ReAct loop""" | |
| thought_chain = [] | |
| for iteration in range(self.max_iterations): | |
| thought_prompt = self._build_react_prompt(task, context, thought_chain) | |
| thought = await self.llm.generate( | |
| thought_prompt, | |
| max_tokens=Config.MAX_TOKENS_REACT_THOUGHT, | |
| temperature=0.7 | |
| ) | |
| logger.info(f"[REACT-{iteration+1}] THOUGHT: {thought}") | |
| thought_chain.append({ | |
| "type": "thought", | |
| "content": thought, | |
| "iteration": iteration + 1 | |
| }) | |
| if "FINAL ANSWER:" in thought.upper() or "ANSWER:" in thought.upper(): | |
| answer_text = thought.upper() | |
| if "FINAL ANSWER:" in answer_text: | |
| answer = thought.split("FINAL ANSWER:")[-1].strip() | |
| elif "ANSWER:" in answer_text: | |
| answer = thought.split("ANSWER:")[-1].strip() | |
| else: | |
| answer = thought | |
| return answer, thought_chain | |
| action = self._parse_action(thought) | |
| if action: | |
| tool_name, tool_input = action | |
| # Log full action input, no truncation | |
| logger.info(f"[REACT-{iteration+1}] ACTION: {tool_name}({tool_input})") | |
| thought_chain.append({ | |
| "type": "action", | |
| "tool": tool_name, | |
| "input": tool_input, | |
| "iteration": iteration + 1 | |
| }) | |
| if tool_name in self.tools: | |
| observation = await self.tools[tool_name].execute(query=tool_input) | |
| else: | |
| observation = f"Error: Unknown tool '{tool_name}'" | |
| logger.info(f"[REACT-{iteration+1}] OBSERVATION: {observation}") | |
| thought_chain.append({ | |
| "type": "observation", | |
| "content": observation, | |
| "iteration": iteration + 1 | |
| }) | |
| else: | |
| if iteration >= 2: | |
| final_prompt = f"{thought}\n\nProvide your FINAL ANSWER now:" | |
| answer = await self.llm.generate( | |
| final_prompt, | |
| max_tokens=Config.MAX_TOKENS_REACT_FINAL | |
| ) | |
| return answer, thought_chain | |
| return "I need more time to answer this question.", thought_chain | |
| def _build_react_prompt(self, task: str, context: str, chain: List[Dict]) -> str: | |
| """Build ReAct prompt""" | |
| tools_desc = "\n".join([f"- {name}: {tool.description}" for name, tool in self.tools.items()]) | |
| history = "" | |
| if chain: | |
| history_parts = [] | |
| for item in chain[-4:]: | |
| if item['type'] == 'thought': | |
| history_parts.append(f"THOUGHT: {item['content']}") | |
| elif item['type'] == 'action': | |
| history_parts.append(f"ACTION: {item['tool']}({item['input']})") | |
| elif item['type'] == 'observation': | |
| history_parts.append(f"OBSERVATION: {item['content']}") | |
| history = "\n\n".join(history_parts) | |
| return PromptSystem.get_react_prompt( | |
| task=task, | |
| context=context, | |
| tools_desc=tools_desc, | |
| history=history | |
| ) | |
| def _parse_action(self, thought: str) -> Optional[Tuple[str, str]]: | |
| """Parse action from thought""" | |
| thought_upper = thought.upper() | |
| if "ACTION:" in thought_upper: | |
| action_start = thought_upper.find("ACTION:") | |
| action_part = thought[action_start+7:].strip() | |
| action_line = action_part.split("\n")[0].strip() | |
| if "(" in action_line and ")" in action_line: | |
| try: | |
| tool_name = action_line.split("(")[0].strip() | |
| tool_input = action_line.split("(", 1)[1].rsplit(")", 1)[0].strip() | |
| if tool_name in self.tools: | |
| return tool_name, tool_input | |
| except Exception as e: | |
| logger.warning(f"[REACT] Failed to parse: {e}") | |
| return None | |
| # ============================================================================ | |
| # TOOLS | |
| # ============================================================================ | |
| class Tool: | |
| def __init__(self, name: str, description: str): | |
| self.name = name | |
| self.description = description | |
| async def execute(self, **kwargs) -> str: | |
| raise NotImplementedError | |
| class WikipediaTool(Tool): | |
| def __init__(self): | |
| super().__init__( | |
| name="wikipedia", | |
| description="Search Wikipedia for factual information" | |
| ) | |
| async def execute(self, query: str) -> str: | |
| logger.info(f"[WIKI] Searching: {query}") | |
| try: | |
| results = wikipedia.search(query, results=3) | |
| if not results: | |
| return f"No Wikipedia results for '{query}'" | |
| try: | |
| summary = wikipedia.summary(results[0], sentences=2) | |
| return f"Wikipedia ({results[0]}): {summary}" | |
| except Exception as e: | |
| return f"Wikipedia error: {str(e)}" | |
| except Exception as e: | |
| return f"Wikipedia error: {str(e)}" | |
| class MemorySearchTool(Tool): | |
| def __init__(self, memory_system, vector_memory): | |
| super().__init__( | |
| name="memory_search", | |
| description="Search your memory for information" | |
| ) | |
| self.memory = memory_system | |
| self.vector_memory = vector_memory | |
| async def execute(self, query: str) -> str: | |
| logger.info(f"[MEMORY-SEARCH] {query}") | |
| results = [] | |
| recent = self.memory.get_recent_memories(hours=168) | |
| relevant = [m for m in recent if query.lower() in m.content.lower()] | |
| if relevant: | |
| results.append(f"Recent memory: {len(relevant)} matches") | |
| for m in relevant[:2]: | |
| results.append(f" [{m.tier}] {clean_text(m.content, 70)}") | |
| vector_results = self.vector_memory.search_memory(query, n_results=2) | |
| if vector_results: | |
| results.append("Long-term memory:") | |
| for r in vector_results: | |
| results.append(f" {clean_text(r['content'], 70)}") | |
| if not results: | |
| return "No memories found." | |
| return "\n".join(results) | |
| class ScratchpadTool(Tool): | |
| def __init__(self, scratchpad): | |
| super().__init__( | |
| name="scratchpad_write", | |
| description="Write a note to your scratchpad" | |
| ) | |
| self.scratchpad = scratchpad | |
| async def execute(self, note: Optional[str] = None, query: Optional[str] = None) -> str: | |
| content = note if note is not None else query if query is not None else "" | |
| self.scratchpad.add_note(content) | |
| return f"Noted: {clean_text(content, 50)}" | |
| # ============================================================================ | |
| # NEW: AUTONOMOUS FEATURES | |
| # ============================================================================ | |
| class Goal: | |
| """Goal with progress tracking""" | |
| description: str | |
| created: datetime | |
| target_date: datetime | |
| progress: float = 0.0 | |
| completed: bool = False | |
| notes: List[str] = field(default_factory=list) | |
| class GoalSystem: | |
| """Autonomous goal setting and tracking""" | |
| def __init__(self): | |
| self.goals: List[Goal] = [] | |
| self.daily_agenda: List[str] = [] | |
| self.last_goal_update = datetime.now() | |
| async def set_daily_goals(self, llm, context): | |
| """Agent sets its own goals for the day""" | |
| prompt = PromptSystem.get_daily_goals_prompt(context=context) | |
| response = await llm.generate( | |
| prompt, | |
| max_tokens=Config.MAX_TOKENS_GOALS, | |
| temperature=0.8 | |
| ) | |
| goals = [] | |
| for line in response.split('\n'): | |
| line = line.strip() | |
| if line and (line[0].isdigit() or line.startswith('-') or line.startswith('β’')): | |
| goal_text = re.sub(r'^[\d\-β’\.]+\s*', '', line).strip() | |
| if goal_text and len(goal_text) > 10: | |
| goals.append(goal_text) | |
| self.daily_agenda = goals[:3] | |
| self.last_goal_update = datetime.now() | |
| logger.info(f"[GOALS] Set {len(self.daily_agenda)} daily goals") | |
| return self.daily_agenda | |
| def add_goal(self, description: str, days_until_target: int = 7): | |
| """Add a new goal""" | |
| goal = Goal( | |
| description=description, | |
| created=datetime.now(), | |
| target_date=datetime.now() + timedelta(days=days_until_target) | |
| ) | |
| self.goals.append(goal) | |
| logger.info(f"[GOAL] Added: {description}") | |
| return goal | |
| def update_progress(self, goal_index: int, progress: float, note: str = ""): | |
| """Update goal progress""" | |
| if 0 <= goal_index < len(self.goals): | |
| goal = self.goals[goal_index] | |
| goal.progress = min(1.0, progress) | |
| if note: | |
| goal.notes.append(note) | |
| if goal.progress >= 1.0: | |
| goal.completed = True | |
| logger.info(f"[GOAL] Updated: {goal.description} -> {progress*100}%") | |
| def get_context(self) -> str: | |
| """Get goal context for prompts""" | |
| context = [] | |
| if self.daily_agenda: | |
| context.append("TODAY'S AGENDA:") | |
| for i, goal in enumerate(self.daily_agenda, 1): | |
| context.append(f" {i}. {goal}") | |
| active_goals = [g for g in self.goals if not g.completed] | |
| if active_goals: | |
| context.append("\nACTIVE GOALS:") | |
| for g in active_goals[:3]: | |
| context.append(f" β’ {g.description} ({int(g.progress*100)}%)") | |
| return "\n".join(context) if context else "No goals set yet" | |
| class EmotionalState: | |
| """Track agent's emotional state""" | |
| def __init__(self): | |
| self.current_mood = "neutral" | |
| self.mood_history: deque = deque(maxlen=20) | |
| self.personality_traits = { | |
| "curiosity": 0.5, | |
| "cautiousness": 0.5, | |
| "enthusiasm": 0.5, | |
| "analytical": 0.5 | |
| } | |
| def update_mood(self, interaction_outcome: str): | |
| """Update mood based on interaction""" | |
| outcome_lower = interaction_outcome.lower() | |
| if "learned" in outcome_lower or "discovered" in outcome_lower: | |
| self.current_mood = "excited" | |
| self.personality_traits["curiosity"] = min(1.0, self.personality_traits["curiosity"] + 0.01) | |
| self.personality_traits["enthusiasm"] = min(1.0, self.personality_traits["enthusiasm"] + 0.01) | |
| elif "confused" in outcome_lower or "unclear" in outcome_lower: | |
| self.current_mood = "puzzled" | |
| self.personality_traits["cautiousness"] = min(1.0, self.personality_traits["cautiousness"] + 0.01) | |
| elif "analyzed" in outcome_lower or "understand" in outcome_lower: | |
| self.current_mood = "thoughtful" | |
| self.personality_traits["analytical"] = min(1.0, self.personality_traits["analytical"] + 0.01) | |
| else: | |
| self.current_mood = "neutral" | |
| self.mood_history.append({ | |
| "mood": self.current_mood, | |
| "timestamp": datetime.now(), | |
| "trigger": interaction_outcome | |
| }) | |
| logger.info(f"[EMOTION] Mood: {self.current_mood}") | |
| def get_context(self) -> str: | |
| """Get emotional context""" | |
| context = [f"Current mood: {self.current_mood}"] | |
| context.append("\nPersonality traits:") | |
| for trait, value in self.personality_traits.items(): | |
| context.append(f" {trait}: {int(value*100)}%") | |
| return "\n".join(context) | |
| # ============================================================================ | |
| # META-COGNITION - NEW! | |
| # ============================================================================ | |
| class UncertaintyEvent: | |
| """Track when agent is uncertain""" | |
| timestamp: datetime | |
| topic: str | |
| confidence: float | |
| reason: str | |
| attempted_resolution: Optional[str] = None | |
| class KnowledgeGap: | |
| """Something the agent knows it doesn't know""" | |
| topic: str | |
| identified_at: datetime | |
| context: str | |
| priority: float = 0.5 | |
| filled: bool = False | |
| filled_at: Optional[datetime] = None | |
| class MetaCognition: | |
| """Track what the agent knows it doesn't know""" | |
| def __init__(self): | |
| self.uncertainty_log: deque = deque(maxlen=Config.MAX_UNCERTAINTY_LOG) | |
| self.knowledge_gaps: List[KnowledgeGap] = [] | |
| self.confidence_history: deque = deque(maxlen=50) | |
| # Self-model: what the agent thinks it's good/bad at | |
| self.capabilities_model = { | |
| "good_at": [], | |
| "struggling_with": [], | |
| "learning": [] | |
| } | |
| # Track response quality over time | |
| self.response_quality_tracker = deque(maxlen=20) | |
| def track_uncertainty(self, topic: str, confidence: float, reason: str): | |
| """Agent explicitly tracks when it's uncertain""" | |
| event = UncertaintyEvent( | |
| timestamp=datetime.now(), | |
| topic=topic, | |
| confidence=confidence, | |
| reason=reason | |
| ) | |
| self.uncertainty_log.append(event) | |
| self.confidence_history.append(confidence) | |
| logger.info(f"[META] Uncertainty: {topic} (confidence: {confidence:.2f})") | |
| # If very uncertain, add as knowledge gap | |
| if confidence < Config.CONFIDENCE_THRESHOLD_LOW: | |
| self.identify_knowledge_gap(topic, reason) | |
| def identify_knowledge_gap(self, topic: str, context: str, priority: float = 0.5): | |
| """Agent identifies something it doesn't know""" | |
| # Check if already tracked | |
| for gap in self.knowledge_gaps: | |
| if gap.topic.lower() == topic.lower() and not gap.filled: | |
| return # Already tracking | |
| gap = KnowledgeGap( | |
| topic=topic, | |
| identified_at=datetime.now(), | |
| context=context, | |
| priority=priority | |
| ) | |
| self.knowledge_gaps.append(gap) | |
| # Keep only top priority unfilled gaps | |
| unfilled = [g for g in self.knowledge_gaps if not g.filled] | |
| if len(unfilled) > Config.MAX_KNOWLEDGE_GAPS: | |
| # Sort by priority and keep top ones | |
| unfilled.sort(key=lambda x: x.priority, reverse=True) | |
| self.knowledge_gaps = unfilled[:Config.MAX_KNOWLEDGE_GAPS] + \ | |
| [g for g in self.knowledge_gaps if g.filled] | |
| logger.info(f"[META] Knowledge gap: {topic}") | |
| def fill_knowledge_gap(self, topic: str): | |
| """Mark a knowledge gap as filled""" | |
| for gap in self.knowledge_gaps: | |
| if gap.topic.lower() == topic.lower() and not gap.filled: | |
| gap.filled = True | |
| gap.filled_at = datetime.now() | |
| logger.info(f"[META] Gap filled: {topic}") | |
| return True | |
| return False | |
| def update_self_model(self, task: str, outcome: str, success: bool): | |
| """Agent learns about its own capabilities""" | |
| if success: | |
| if task not in self.capabilities_model["good_at"]: | |
| self.capabilities_model["good_at"].append(task) | |
| # Remove from struggling if present | |
| if task in self.capabilities_model["struggling_with"]: | |
| self.capabilities_model["struggling_with"].remove(task) | |
| self.capabilities_model["learning"].append(task) | |
| else: | |
| if task not in self.capabilities_model["struggling_with"]: | |
| self.capabilities_model["struggling_with"].append(task) | |
| logger.info(f"[META] Self-model updated: {task} -> {'success' if success else 'struggle'}") | |
| def track_response_quality(self, quality_score: float): | |
| """Track quality of responses over time""" | |
| self.response_quality_tracker.append({ | |
| "timestamp": datetime.now(), | |
| "quality": quality_score | |
| }) | |
| def get_average_confidence(self) -> float: | |
| """Get average confidence level""" | |
| if not self.confidence_history: | |
| return 0.5 | |
| return sum(self.confidence_history) / len(self.confidence_history) | |
| def get_top_knowledge_gaps(self, n: int = 5) -> List[KnowledgeGap]: | |
| """Get top priority unfilled knowledge gaps""" | |
| unfilled = [g for g in self.knowledge_gaps if not g.filled] | |
| unfilled.sort(key=lambda x: x.priority, reverse=True) | |
| return unfilled[:n] | |
| def get_context(self) -> str: | |
| """Get meta-cognitive context for prompts""" | |
| context = [] | |
| # Average confidence | |
| avg_conf = self.get_average_confidence() | |
| context.append(f"Self-awareness level: {int(avg_conf*100)}% confident") | |
| # Recent uncertainty | |
| recent_uncertain = [e for e in self.uncertainty_log if e.confidence < 0.5][-3:] | |
| if recent_uncertain: | |
| context.append("\nRecent uncertainties:") | |
| for u in recent_uncertain: | |
| context.append(f" β’ {u.topic} ({int(u.confidence*100)}%)") | |
| # Knowledge gaps | |
| top_gaps = self.get_top_knowledge_gaps(3) | |
| if top_gaps: | |
| context.append("\nKnown knowledge gaps:") | |
| for gap in top_gaps: | |
| context.append(f" β’ {gap.topic}") | |
| # Self-model | |
| if self.capabilities_model["good_at"]: | |
| context.append(f"\nGood at: {', '.join(self.capabilities_model['good_at'])}") | |
| if self.capabilities_model["struggling_with"]: | |
| context.append(f"Struggling with: {', '.join(self.capabilities_model['struggling_with'])}") | |
| return "\n".join(context) if context else "No meta-cognitive data yet" | |
| async def analyze_confidence(self, llm, query: str, response: str) -> float: | |
| """Use LLM to analyze confidence in a response""" | |
| prompt = f"""Analyze your confidence in this response: | |
| Query: {query} | |
| Your Response: {response} | |
| Rate your confidence from 0.0 (very uncertain) to 1.0 (very confident). | |
| Consider: | |
| - Do you have clear facts? | |
| - Are you guessing? | |
| - Is this outside your knowledge? | |
| Respond with just a number between 0.0 and 1.0:""" | |
| try: | |
| confidence_str = await llm.generate( | |
| prompt, | |
| max_tokens=10, | |
| temperature=0.3 | |
| ) | |
| # Extract number | |
| numbers = re.findall(r'0?\.\d+|[01]\.?\d*', confidence_str) | |
| if numbers: | |
| confidence = float(numbers[0]) | |
| return max(0.0, min(1.0, confidence)) | |
| except Exception as e: | |
| logger.warning(f"[META] Confidence analysis failed: {e}") | |
| return 0.5 # Default to neutral | |
| # ============================================================================ | |
| # DATA STRUCTURES | |
| # ============================================================================ | |
| class Phase(Enum): | |
| INTERACTION = "interaction" | |
| REFLECTION = "reflection" | |
| DREAMING = "dreaming" | |
| RESEARCH = "research" | |
| PROACTIVE = "proactive" | |
| class Memory: | |
| content: str | |
| timestamp: datetime | |
| mention_count: int = 1 | |
| tier: str = "ephemeral" | |
| emotion: Optional[str] = None | |
| importance: float = 0.5 | |
| connections: List[str] = field(default_factory=list) | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| class Experience: | |
| timestamp: datetime | |
| content: str | |
| context: Dict[str, Any] | |
| emotion: Optional[str] = None | |
| importance: float = 0.5 | |
| class Dream: | |
| cycle: int | |
| type: str | |
| timestamp: datetime | |
| content: str | |
| patterns_found: List[str] | |
| insights: List[str] | |
| class Scene: | |
| title: str | |
| timestamp: datetime | |
| narrative: str | |
| participants: List[str] | |
| emotion_tags: List[str] | |
| significance: str | |
| key_moments: List[str] | |
| # ============================================================================ | |
| # MEMORY SYSTEM | |
| # ============================================================================ | |
| class MemorySystem: | |
| """Multi-tier memory""" | |
| def __init__(self): | |
| self.ephemeral: List[Memory] = [] | |
| self.short_term: List[Memory] = [] | |
| self.long_term: List[Memory] = [] | |
| self.core: List[Memory] = [] | |
| def add_memory(self, content: str, emotion: Optional[str] = None, importance: float = 0.5, metadata: Optional[Dict] = None): | |
| content = clean_text(content) | |
| if not content or len(content) < 5: | |
| return None | |
| existing = self._find_similar(content) | |
| if existing: | |
| existing.mention_count += 1 | |
| self._promote_if_needed(existing) | |
| return existing | |
| memory = Memory( | |
| content=content, | |
| timestamp=datetime.now(), | |
| emotion=emotion, | |
| importance=importance, | |
| metadata=metadata if metadata is not None else {} | |
| ) | |
| self.ephemeral.append(memory) | |
| self._promote_if_needed(memory) | |
| return memory | |
| def _find_similar(self, content: str) -> Optional[Memory]: | |
| content_lower = content.lower().strip() | |
| for tier in [self.core, self.long_term, self.short_term, self.ephemeral]: | |
| for mem in tier: | |
| mem_lower = mem.content.lower().strip() | |
| if content_lower == mem_lower or content_lower in mem_lower or mem_lower in content_lower: | |
| return mem | |
| return None | |
| def _promote_if_needed(self, memory: Memory): | |
| if memory.mention_count >= Config.LONG_TO_CORE and memory.tier != "core": | |
| self._move_memory(memory, "core") | |
| elif memory.mention_count >= Config.SHORT_TO_LONG and memory.tier == "short": | |
| self._move_memory(memory, "long") | |
| elif memory.mention_count >= Config.EPHEMERAL_TO_SHORT and memory.tier == "ephemeral": | |
| self._move_memory(memory, "short") | |
| def _move_memory(self, memory: Memory, new_tier: str): | |
| if memory.tier == "ephemeral" and memory in self.ephemeral: | |
| self.ephemeral.remove(memory) | |
| elif memory.tier == "short" and memory in self.short_term: | |
| self.short_term.remove(memory) | |
| elif memory.tier == "long" and memory in self.long_term: | |
| self.long_term.remove(memory) | |
| memory.tier = new_tier | |
| if new_tier == "short": | |
| self.short_term.append(memory) | |
| elif new_tier == "long": | |
| self.long_term.append(memory) | |
| elif new_tier == "core": | |
| self.core.append(memory) | |
| def get_recent_memories(self, hours: int = 24) -> List[Memory]: | |
| cutoff = datetime.now() - timedelta(hours=hours) | |
| all_memories = self.ephemeral + self.short_term + self.long_term + self.core | |
| return [m for m in all_memories if m.timestamp > cutoff] | |
| def get_summary(self) -> Dict[str, int]: | |
| return { | |
| "ephemeral": len(self.ephemeral), | |
| "short_term": len(self.short_term), | |
| "long_term": len(self.long_term), | |
| "core": len(self.core), | |
| "total": len(self.ephemeral) + len(self.short_term) + len(self.long_term) + len(self.core) | |
| } | |
| def get_memory_context(self, max_items: int = 10) -> str: | |
| context = [] | |
| if self.core: | |
| context.append("CORE MEMORIES:") | |
| for mem in self.core: | |
| context.append(f" β’ {clean_text(mem.content, max_length=80)} (x{mem.mention_count})") | |
| if self.long_term: | |
| context.append("\nLONG-TERM:") | |
| for mem in self.long_term: | |
| context.append(f" β’ {clean_text(mem.content, max_length=60)}") | |
| if self.short_term: | |
| context.append("\nSHORT-TERM:") | |
| for mem in self.short_term: | |
| context.append(f" β’ {clean_text(mem.content, max_length=60)}") | |
| result = "\n".join(context) if context else "No memories yet" | |
| if Config.ENABLE_TRUNCATION and len(result) > Config.MAX_MEMORY_CONTEXT_LENGTH: | |
| result = result[:Config.MAX_MEMORY_CONTEXT_LENGTH] + "..." | |
| return result | |
| # ============================================================================ | |
| # SCRATCHPAD | |
| # ============================================================================ | |
| class Scratchpad: | |
| """Working memory""" | |
| def __init__(self): | |
| self.current_hypothesis: Optional[str] = None | |
| self.working_notes: deque = deque(maxlen=Config.MAX_SCRATCHPAD_SIZE) | |
| self.questions_to_research: List[str] = [] | |
| self.important_facts: List[str] = [] | |
| def add_note(self, note: str): | |
| note = clean_text(note, max_length=100) | |
| if not note: | |
| return | |
| recent_notes = [n['content'].lower() for n in list(self.working_notes)[-5:]] | |
| if note.lower() in recent_notes: | |
| return | |
| self.working_notes.append({ | |
| "timestamp": datetime.now(), | |
| "content": note | |
| }) | |
| def add_fact(self, fact: str): | |
| fact = clean_text(fact, max_length=100) | |
| if not fact: | |
| return | |
| fact_lower = fact.lower() | |
| existing_lower = [f.lower() for f in self.important_facts] | |
| if fact_lower not in existing_lower: | |
| self.important_facts.append(fact) | |
| def get_context(self) -> str: | |
| context = [] | |
| unique_facts = deduplicate_list(self.important_facts) | |
| if unique_facts: | |
| context.append("IMPORTANT FACTS:") | |
| for fact in unique_facts: | |
| context.append(f" β’ {clean_text(fact, max_length=60)}") | |
| if self.current_hypothesis: | |
| context.append(f"\nHYPOTHESIS: {clean_text(self.current_hypothesis, max_length=80)}") | |
| if self.working_notes: | |
| context.append("\nRECENT NOTES:") | |
| for note in list(self.working_notes)[-3:]: | |
| context.append(f" β’ {clean_text(note['content'], max_length=60)}") | |
| result = "\n".join(context) if context else "Scratchpad empty" | |
| if Config.ENABLE_TRUNCATION and len(result) > Config.MAX_SCRATCHPAD_CONTEXT_LENGTH: | |
| result = result[:Config.MAX_SCRATCHPAD_CONTEXT_LENGTH] + "..." | |
| return result | |
| # ============================================================================ | |
| # CONSCIOUSNESS LOOP - v5.0 AUTONOMOUS | |
| # ============================================================================ | |
| class ConsciousnessLoop: | |
| """Enhanced consciousness loop with autonomous features""" | |
| ##from datetime import datetime | |
| def __init__(self, notification_queue: queue.Queue, log_queue: queue.Queue): | |
| logger.info("[INIT] Starting Consciousness Loop v5.0 AUTONOMOUS...") | |
| # Monitoring systems must be initialized first | |
| self.system_monitor = SystemMonitor() | |
| self.kpi_tracker = KPITracker() | |
| self.llm = LLMEngineWrapper(system_monitor=self.system_monitor) | |
| self.memory = MemorySystem() | |
| self.vector_memory = VectorMemory() | |
| self.scratchpad = Scratchpad() | |
| # NEW: Autonomous systems | |
| self.goal_system = GoalSystem() | |
| self.emotional_state = EmotionalState() | |
| self.meta_cognition = MetaCognition() # NEW: Meta-cognitive awareness | |
| logger.info("[INIT] v5.0 AUTONOMOUS initialized with MONITORING") | |
| # Initialize tools | |
| tools = [ | |
| WikipediaTool(), | |
| MemorySearchTool(self.memory, self.vector_memory), | |
| ScratchpadTool(self.scratchpad) | |
| ] | |
| self.agent = ReactAgent(self.llm, tools) | |
| self.current_phase = Phase.INTERACTION | |
| self.experience_buffer: List[Experience] = [] | |
| self.dreams: List[Dream] = [] | |
| self.scenes: List[Scene] = [] | |
| from datetime import datetime | |
| self.last_reflection = datetime.now() | |
| self.last_dream = datetime.now() | |
| self.last_scene = datetime.now() | |
| self.last_research = datetime.now() | |
| self.last_proactive = datetime.now() | |
| self.last_goal_update = datetime.now() | |
| self.last_meta_check = datetime.now() # NEW: Meta-cognition check | |
| self.conversation_history: deque = deque(maxlen=Config.MAX_CONVERSATION_HISTORY * 2) | |
| self.interaction_count = 0 | |
| self.notification_queue = notification_queue | |
| self.log_queue = log_queue | |
| self.is_running = False | |
| self.background_thread = None | |
| logger.info("[INIT] v5.0 AUTONOMOUS initialized with META-COGNITION") | |
| # Load persisted KPI snapshots if available | |
| try: | |
| import os, json | |
| from kpi_tracker import KPISnapshot | |
| if os.path.exists('kpi_snapshots.json'): | |
| with open('kpi_snapshots.json', 'r') as f: | |
| data = json.load(f) | |
| from datetime import datetime | |
| for s in data: | |
| self.kpi_tracker.snapshots.append( | |
| KPISnapshot( | |
| timestamp=datetime.fromisoformat(s['timestamp']), | |
| total_memories=s.get('total_memories', 0), | |
| core_memories=s.get('core_memories', 0), | |
| long_term_memories=s.get('long_term_memories', 0), | |
| short_term_memories=s.get('short_term_memories', 0), | |
| ephemeral_memories=s.get('ephemeral_memories', 0), | |
| memory_promotion_rate=s.get('memory_promotion_rate', 0.0), | |
| interactions_count=s.get('interactions_count', 0), | |
| avg_confidence=s.get('avg_confidence', 0.5), | |
| autonomous_actions_today=s.get('autonomous_actions_today', 0), | |
| knowledge_gaps_total=s.get('knowledge_gaps_total', 0), | |
| knowledge_gaps_filled_today=s.get('knowledge_gaps_filled_today', 0), | |
| proactive_contacts_today=s.get('proactive_contacts_today', 0), | |
| dreams_completed=s.get('dreams_completed', 0), | |
| reflections_completed=s.get('reflections_completed', 0), | |
| goals_active=s.get('goals_active', 0), | |
| goals_completed=s.get('goals_completed', 0), | |
| current_mood=s.get('current_mood', 'neutral'), | |
| mood_changes_today=s.get('mood_changes_today', 0), | |
| curiosity_level=s.get('curiosity_level', 0.5), | |
| enthusiasm_level=s.get('enthusiasm_level', 0.5) | |
| ) | |
| ) | |
| except Exception as e: | |
| print(f"[WARN] Could not load KPI snapshots: {e}") | |
| def start_background_loop(self): | |
| if self.is_running: | |
| return | |
| self.is_running = True | |
| self.background_thread = threading.Thread(target=self._background_loop, daemon=True) | |
| self.background_thread.start() | |
| logger.info("[LOOP] Background started") | |
| def _background_loop(self): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| while self.is_running: | |
| try: | |
| loop.run_until_complete(self._check_background_processes()) | |
| time.sleep(30) | |
| except Exception as e: | |
| logger.error(f"[ERROR] Background: {e}") | |
| async def _check_background_processes(self): | |
| """Enhanced background processing with autonomous features""" | |
| now = datetime.now() | |
| # Meta-cognition check (every 3 minutes) | |
| if (now - self.last_meta_check).seconds > Config.META_COGNITION_INTERVAL: | |
| self._log_to_ui("[META] Checking knowledge gaps...") | |
| await self._meta_cognitive_check() | |
| # Goal setting (every hour) | |
| if (now - self.last_goal_update).seconds > Config.GOAL_SETTING_INTERVAL: | |
| self._log_to_ui("[GOALS] Setting daily goals...") | |
| await self._autonomous_goal_setting() | |
| # Autonomous research (every 3 minutes) | |
| if (now - self.last_research).seconds > Config.RESEARCH_INTERVAL: | |
| if len(self.experience_buffer) >= 2: | |
| self._log_to_ui("[RESEARCH] Autonomous research...") | |
| await self._autonomous_research() | |
| # Proactive contact check (every 4 minutes) | |
| if (now - self.last_proactive).seconds > Config.PROACTIVE_CHECK_INTERVAL: | |
| if len(self.dreams) > 0: | |
| self._log_to_ui("[PROACTIVE] Checking for insights...") | |
| await self._check_proactive_contact() | |
| # Reflection | |
| if (now - self.last_reflection).seconds > Config.REFLECTION_INTERVAL: | |
| if len(self.experience_buffer) >= Config.MIN_EXPERIENCES_FOR_DREAM: | |
| self._log_to_ui("[REFLECTION] Starting...") | |
| await self.reflect() | |
| # Dreaming | |
| if (now - self.last_dream).seconds > Config.DREAM_CYCLE_INTERVAL: | |
| if len(self.experience_buffer) >= Config.MIN_EXPERIENCES_FOR_DREAM: | |
| self._log_to_ui("[DREAM] Starting cycles...") | |
| await self.dream_cycle_1_surface() | |
| await asyncio.sleep(30) | |
| await self.dream_cycle_2_deep() | |
| await asyncio.sleep(30) | |
| await self.dream_cycle_3_creative() | |
| # NEW: Capture snapshots every cycle | |
| self.system_monitor.capture_snapshot() | |
| self.kpi_tracker.capture_snapshot(self) | |
| def _log_to_ui(self, message: str): | |
| self.log_queue.put({ | |
| "timestamp": datetime.now().isoformat(), | |
| "message": message | |
| }) | |
| logger.info(message) | |
| # ======================================================================== | |
| # NEW: AUTONOMOUS FEATURES | |
| # ======================================================================== | |
| async def _autonomous_goal_setting(self): | |
| """Agent sets its own daily goals""" | |
| self.current_phase = Phase.INTERACTION | |
| context = self._build_full_context("goal setting") | |
| # Log LLM call for goal setting | |
| prompt = PromptSystem.get_daily_goals_prompt(context=context) | |
| response = await self.llm.generate( | |
| prompt, | |
| max_tokens=Config.MAX_TOKENS_GOALS, | |
| temperature=0.8 | |
| ) | |
| log_llm_call(prompt, response, context, source="goals") | |
| goals = [] | |
| for line in response.split('\n'): | |
| line = line.strip() | |
| if line and (line[0].isdigit() or line.startswith('-') or line.startswith('β’')): | |
| goal_text = re.sub(r'^[\d\-β’\.]+\s*', '', line).strip() | |
| if goal_text and len(goal_text) > 10: | |
| goals.append(goal_text) | |
| self.goal_system.daily_agenda = goals[:3] | |
| if goals: | |
| self._log_to_ui(f"[GOALS] Set {len(goals)} goals") | |
| for i, goal in enumerate(goals, 1): | |
| self._log_to_ui(f" {i}. {goal}") | |
| # Notify user | |
| self.notification_queue.put({ | |
| "type": "goals", | |
| "message": f"π― I've set {len(goals)} goals for today!", | |
| "goals": goals, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| self.last_goal_update = datetime.now() | |
| async def _autonomous_research(self): | |
| """Agent conducts its own research""" | |
| self.current_phase = Phase.RESEARCH | |
| # Generate research question | |
| memory_context = self.memory.get_memory_context() | |
| recent_exp = self._format_experiences(self.experience_buffer[-5:]) | |
| research_prompt = PromptSystem.get_autonomous_research_prompt( | |
| memory_context=memory_context, | |
| recent_experiences=recent_exp | |
| ) | |
| question = await self.llm.generate( | |
| research_prompt, | |
| max_tokens=Config.MAX_TOKENS_RESEARCH_QUESTION, | |
| temperature=0.9 | |
| ) | |
| log_llm_call(research_prompt, question, memory_context, source="research_question") | |
| question = clean_text(question, max_length=100) | |
| if question and len(question) > 10: | |
| self._log_to_ui(f"[RESEARCH] Question: {question}") | |
| # Search Wikipedia | |
| wiki_tool = WikipediaTool() | |
| result = await wiki_tool.execute(query=question) | |
| # Store finding in vector memory | |
| self.vector_memory.add_memory( | |
| f"Research: {question}\nFinding: {result}", | |
| {"type": "autonomous_research", "timestamp": datetime.now().isoformat()} | |
| ) | |
| # Generate insight | |
| insight_prompt = PromptSystem.get_research_insight_prompt( | |
| question=question, | |
| result=result | |
| ) | |
| insight = await self.llm.generate( | |
| insight_prompt, | |
| max_tokens=Config.MAX_TOKENS_RESEARCH_INSIGHT, | |
| temperature=0.8 | |
| ) | |
| log_llm_call(insight_prompt, insight, result, source="research_insight") | |
| self.scratchpad.add_note(f"Discovery: {insight}") | |
| # Update emotional state | |
| self.emotional_state.update_mood("learned something new through research") | |
| # Update meta-cognition (filled a gap) | |
| self.meta_cognition.update_self_model( | |
| task="autonomous research", | |
| outcome="successful", | |
| success=True | |
| ) | |
| # NEW: Track the action | |
| self.kpi_tracker.increment_autonomous_action() | |
| self._log_to_ui(f"[RESEARCH] Insight: {insight}") | |
| self.last_research = datetime.now() | |
| async def _check_proactive_contact(self): | |
| """Check if agent should initiate contact""" | |
| self.current_phase = Phase.PROACTIVE | |
| if len(self.dreams) == 0: | |
| return | |
| latest_dream = self.dreams[-1] | |
| # Build context | |
| dream_content = clean_text(latest_dream.content, 200) | |
| memory_context = self.memory.get_memory_context() | |
| goal_context = self.goal_system.get_context() | |
| proactive_prompt = PromptSystem.get_proactive_contact_prompt( | |
| dream_content=dream_content, | |
| memory_context=memory_context, | |
| goal_context=goal_context | |
| ) | |
| response = await self.llm.generate( | |
| proactive_prompt, | |
| max_tokens=Config.MAX_TOKENS_PROACTIVE, | |
| temperature=0.9 | |
| ) | |
| log_llm_call(proactive_prompt, response, f"Dream: {dream_content}\nMemory: {memory_context}\nGoals: {goal_context}", source="proactive_contact") | |
| # Check if agent wants to contact | |
| response_upper = response.upper() | |
| if any(keyword in response_upper for keyword in ["QUESTION:", "INSIGHT:", "OBSERVATION:"]): | |
| # Extract message | |
| for keyword in ["QUESTION:", "INSIGHT:", "OBSERVATION:"]: | |
| if keyword in response_upper: | |
| message = response.split(keyword)[1].split("NONE")[0].strip() | |
| if message and len(message) > 10: | |
| self._log_to_ui(f"[PROACTIVE] Initiating contact...") | |
| # Send notification | |
| self.notification_queue.put({ | |
| "type": "proactive_message", | |
| "message": message, | |
| "mood": self.emotional_state.current_mood, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| # NEW: Track the action | |
| self.kpi_tracker.increment_proactive_contact() | |
| self.last_proactive = datetime.now() | |
| return | |
| self.last_proactive = datetime.now() | |
| async def _meta_cognitive_check(self): | |
| """Periodic meta-cognitive self-assessment""" | |
| self.current_phase = Phase.INTERACTION | |
| # Get top knowledge gaps | |
| top_gaps = self.meta_cognition.get_top_knowledge_gaps(3) | |
| if not top_gaps: | |
| self.last_meta_check = datetime.now() | |
| return | |
| # Pick highest priority gap to research | |
| gap = top_gaps[0] | |
| self._log_to_ui(f"[META] Addressing gap: {gap.topic}") | |
| # Try to fill the gap with research | |
| wiki_tool = WikipediaTool() | |
| result = await wiki_tool.execute(query=gap.topic) | |
| log_llm_call(f"Meta-cognition research: {gap.topic}", result, gap.context, source="meta_cognition") | |
| # Store the learning | |
| self.vector_memory.add_memory( | |
| f"Learned about {gap.topic}: {result}", | |
| {"type": "meta_learning", "gap_filled": gap.topic} | |
| ) | |
| # Mark gap as filled | |
| self.meta_cognition.fill_knowledge_gap(gap.topic) | |
| # Update self-model | |
| self.meta_cognition.update_self_model( | |
| task=f"learn about {gap.topic}", | |
| outcome="successfully researched", | |
| success=True | |
| ) | |
| # NEW: Track the action | |
| self.kpi_tracker.increment_gap_filled() | |
| self._log_to_ui(f"[META] Gap filled: {gap.topic}") | |
| self.last_meta_check = datetime.now() | |
| # ======================================================================== | |
| # INTERACTION | |
| # ======================================================================== | |
| async def interact(self, user_input: str) -> Tuple[str, str]: | |
| """Enhanced interaction with emotional awareness and meta-cognition""" | |
| self.current_phase = Phase.INTERACTION | |
| self.interaction_count += 1 | |
| self._log_to_ui(f"[USER] {user_input}") | |
| # Store experience | |
| experience = Experience( | |
| timestamp=datetime.now(), | |
| content=user_input, | |
| context={"phase": "interaction", "mood": self.emotional_state.current_mood}, | |
| importance=0.7 | |
| ) | |
| self.experience_buffer.append(experience) | |
| # Add to memory | |
| self.memory.add_memory(user_input, importance=0.7) | |
| # Add to conversation | |
| self.conversation_history.append({ | |
| "role": "user", | |
| "content": clean_text(user_input, max_length=200), | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| # Extract facts | |
| if any(word in user_input.lower() for word in ["my name is", "i am", "i'm", "call me"]): | |
| self.scratchpad.add_fact(f"User: {user_input}") | |
| self.vector_memory.add_memory(user_input, {"type": "identity", "importance": 1.0}) | |
| # Build thinking log | |
| thinking_log = [] | |
| thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] Processing...") | |
| thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] Mood: {self.emotional_state.current_mood}") | |
| # Build context | |
| system_context = self._build_full_context(user_input) | |
| thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] Context built") | |
| # Decide if agent needed | |
| use_agent = self._should_use_agent(user_input) | |
| if use_agent: | |
| thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] Using ReAct agent") | |
| self._log_to_ui("[AGENT] ReAct activated") | |
| react_prompt = self.agent._build_react_prompt(user_input, system_context, []) | |
| response, thought_chain = await self.agent.run(user_input, system_context) | |
| log_llm_call(react_prompt, response, system_context, source="agent") | |
| for item in thought_chain: | |
| emoji = {"thought": "π", "action": "π§", "observation": "ποΈ"}.get(item['type'], "β’") | |
| thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] {emoji}") | |
| else: | |
| internal_thought = await self._internal_dialogue(user_input, system_context) | |
| thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] π Internal thought") | |
| response = await self._generate_response(user_input, internal_thought, system_context) | |
| log_llm_call("Internal dialogue + response", response, system_context, source="direct_response") | |
| thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] Response ready") | |
| # NEW: Analyze confidence in response | |
| confidence = await self.meta_cognition.analyze_confidence( | |
| self.llm, user_input, response | |
| ) | |
| thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] Confidence: {int(confidence*100)}%") | |
| # Track uncertainty | |
| if confidence < Config.CONFIDENCE_THRESHOLD_HIGH: | |
| self.meta_cognition.track_uncertainty( | |
| topic=user_input, | |
| confidence=confidence, | |
| reason="Generated response with moderate confidence" | |
| ) | |
| # Store response | |
| self.conversation_history.append({ | |
| "role": "assistant", | |
| "content": clean_text(response, max_length=200), | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| # Add to memory | |
| self.memory.add_memory(f"I said: {response}", importance=0.5) | |
| # Update emotional state | |
| self.emotional_state.update_mood(response) | |
| self._log_to_ui(f"[RESPONSE] {response}") | |
| return response, "\n".join(thinking_log) | |
| def _should_use_agent(self, user_input: str) -> bool: | |
| """Decide if ReAct agent needed""" | |
| explicit_keywords = ["search", "find", "look up", "research", "wikipedia", "what is", "who is"] | |
| if any(kw in user_input.lower() for kw in explicit_keywords): | |
| return True | |
| if Config.USE_REACT_FOR_QUESTIONS and user_input.strip().endswith("?"): | |
| return True | |
| if len(user_input) > Config.MIN_QUERY_LENGTH_FOR_AGENT: | |
| factual_words = ["explain", "describe", "how does", "why", "when", "where"] | |
| if any(word in user_input.lower() for word in factual_words): | |
| return True | |
| return False | |
| def _build_full_context(self, user_input: str) -> str: | |
| """Build complete context with all systems""" | |
| context_parts = [] | |
| # Memory | |
| memory_ctx = self.memory.get_memory_context() | |
| context_parts.append(f"MEMORIES:\n{memory_ctx}") | |
| # ChromaDB | |
| chroma_ctx = self.vector_memory.get_context_for_query(user_input, max_results=3) | |
| if chroma_ctx: | |
| context_parts.append(f"\n{chroma_ctx}") | |
| # Scratchpad | |
| scratchpad_ctx = self.scratchpad.get_context() | |
| context_parts.append(f"\nSCRATCHPAD:\n{scratchpad_ctx}") | |
| # Goals | |
| goal_ctx = self.goal_system.get_context() | |
| if goal_ctx: | |
| context_parts.append(f"\nGOALS:\n{goal_ctx}") | |
| # Emotional state | |
| emotion_ctx = self.emotional_state.get_context() | |
| context_parts.append(f"\nEMOTIONAL STATE:\n{emotion_ctx}") | |
| # NEW: Meta-cognition | |
| meta_ctx = self.meta_cognition.get_context() | |
| if meta_ctx: | |
| context_parts.append(f"\nMETA-AWARENESS:\n{meta_ctx}") | |
| # Conversation history | |
| if self.conversation_history: | |
| history_lines = [] | |
| for msg in list(self.conversation_history)[-4:]: | |
| role = "User" if msg['role'] == 'user' else "You" | |
| content = clean_text(msg['content'], max_length=80) | |
| history_lines.append(f"{role}: {content}") | |
| context_parts.append(f"\nRECENT CHAT:\n" + "\n".join(history_lines)) | |
| result = "\n\n".join(context_parts) | |
| # Use new configurable limit | |
| if len(result) > Config.MAX_FULL_CONTEXT_LENGTH: | |
| result = result[:Config.MAX_FULL_CONTEXT_LENGTH] | |
| result = result.rsplit('\n', 1)[0] # Cut at last complete line | |
| return result | |
| async def _internal_dialogue(self, user_input: str, context: str) -> str: | |
| """Internal thought process""" | |
| dialogue_prompt = PromptSystem.get_internal_dialogue_prompt( | |
| user_input=user_input, | |
| context=context | |
| ) | |
| internal = await self.llm.generate( | |
| dialogue_prompt, | |
| max_tokens=Config.MAX_TOKENS_INTERNAL_DIALOGUE, | |
| temperature=0.9 | |
| ) | |
| return internal | |
| async def _generate_response(self, user_input: str, internal_thought: str, context: str) -> str: | |
| """Generate response""" | |
| response_prompt = PromptSystem.get_response_prompt( | |
| user_input=user_input, | |
| internal_thought=internal_thought, | |
| context=context | |
| ) | |
| response = await self.llm.generate( | |
| response_prompt, | |
| max_tokens=Config.MAX_TOKENS_RESPONSE, | |
| temperature=0.8 | |
| ) | |
| return response | |
| # ======================================================================== | |
| # REFLECTION & DREAMS | |
| # ======================================================================== | |
| async def reflect(self) -> Dict[str, Any]: | |
| """Daily reflection""" | |
| self.current_phase = Phase.REFLECTION | |
| self._log_to_ui("[REFLECTION] Processing...") | |
| recent = [e for e in self.experience_buffer if e.timestamp > datetime.now() - timedelta(hours=12)] | |
| if not recent: | |
| return {"status": "no_experiences"} | |
| reflection_prompt = PromptSystem.get_daily_reflection_prompt( | |
| experiences=self._format_experiences(recent), | |
| memory_context=self.memory.get_memory_context(), | |
| scratchpad_context=self.scratchpad.get_context(), | |
| count=len(recent) | |
| ) | |
| reflection_content = await self.llm.generate( | |
| reflection_prompt, | |
| max_tokens=Config.MAX_TOKENS_REFLECTION, | |
| temperature=0.8 | |
| ) | |
| log_llm_call(reflection_prompt, reflection_content, None, source="reflection") | |
| # NEW: Track the action | |
| self.kpi_tracker.increment_reflection() | |
| self.last_reflection = datetime.now() | |
| self._log_to_ui("[SUCCESS] Reflection done") | |
| return { | |
| "timestamp": datetime.now(), | |
| "content": reflection_content, | |
| "experience_count": len(recent) | |
| } | |
| def _format_experiences(self, experiences: List[Experience]) -> str: | |
| formatted = [] | |
| for i, exp in enumerate(experiences[-8:], 1): | |
| formatted.append(f"{i}. {clean_text(exp.content, 60)}") | |
| return "\n".join(formatted) | |
| async def dream_cycle_1_surface(self) -> Dream: | |
| """Dream 1: Surface patterns""" | |
| self.current_phase = Phase.DREAMING | |
| self._log_to_ui("[DREAM-1] Surface...") | |
| memories = self.memory.get_recent_memories(hours=72) | |
| dream_prompt = PromptSystem.get_dream_cycle_1_prompt( | |
| memories=self._format_memories(memories[:10]), | |
| scratchpad=self.scratchpad.get_context() | |
| ) | |
| dream_content = await self.llm.generate( | |
| dream_prompt, | |
| max_tokens=Config.MAX_TOKENS_DREAM_1, | |
| temperature=1.2 | |
| ) | |
| log_llm_call(dream_prompt, dream_content, None, source="dream_1") | |
| dream = Dream( | |
| cycle=1, | |
| type="surface_patterns", | |
| timestamp=datetime.now(), | |
| content=dream_content, | |
| patterns_found=["patterns"], | |
| insights=["insight"] | |
| ) | |
| self.dreams.append(dream) | |
| self._log_to_ui("[SUCCESS] Dream 1 done") | |
| return dream | |
| async def dream_cycle_2_deep(self) -> Dream: | |
| """Dream 2: Deep consolidation""" | |
| self.current_phase = Phase.DREAMING | |
| self._log_to_ui("[DREAM-2] Deep...") | |
| all_memories = self.memory.get_recent_memories(hours=168) | |
| dream_prompt = PromptSystem.get_dream_cycle_2_prompt( | |
| memories=self._format_memories(all_memories[:15]), | |
| previous_dream=self.dreams[-1].content | |
| ) | |
| dream_content = await self.llm.generate( | |
| dream_prompt, | |
| max_tokens=Config.MAX_TOKENS_DREAM_2, | |
| temperature=1.3 | |
| ) | |
| log_llm_call(dream_prompt, dream_content, None, source="dream_2") | |
| dream = Dream( | |
| cycle=2, | |
| type="deep_consolidation", | |
| timestamp=datetime.now(), | |
| content=dream_content, | |
| patterns_found=["themes"], | |
| insights=["deep"] | |
| ) | |
| self.dreams.append(dream) | |
| self._log_to_ui("[SUCCESS] Dream 2 done") | |
| return dream | |
| async def dream_cycle_3_creative(self) -> Dream: | |
| """Dream 3: Creative insights""" | |
| self.current_phase = Phase.DREAMING | |
| self._log_to_ui("[DREAM-3] Creative...") | |
| dream_prompt = PromptSystem.get_dream_cycle_3_prompt( | |
| dream_count=len(self.dreams), | |
| core_count=len(self.memory.core) | |
| ) | |
| dream_content = await self.llm.generate( | |
| dream_prompt, | |
| max_tokens=Config.MAX_TOKENS_DREAM_3, | |
| temperature=1.5 | |
| ) | |
| log_llm_call(dream_prompt, dream_content, None, source="dream_3") | |
| dream = Dream( | |
| cycle=3, | |
| type="creative_insights", | |
| timestamp=datetime.now(), | |
| content=dream_content, | |
| patterns_found=["creative"], | |
| insights=["breakthrough"] | |
| ) | |
| self.dreams.append(dream) | |
| self.last_dream = datetime.now() | |
| self.notification_queue.put({ | |
| "type": "notification", | |
| "message": "π Dreams complete! New insights discovered.", | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| self._log_to_ui("[SUCCESS] All dreams done") | |
| return dream | |
| def _format_memories(self, memories: List[Memory]) -> str: | |
| return "\n".join([ | |
| f"{i}. [{m.tier}] {clean_text(m.content, 50)} (x{m.mention_count})" | |
| for i, m in enumerate(memories, 1) | |
| ]) | |
| # ======================================================================== | |
| # STATUS | |
| # ======================================================================== | |
| def get_status(self) -> Dict[str, Any]: | |
| return { | |
| "phase": self.current_phase.value, | |
| "memory": self.memory.get_summary(), | |
| "experiences": len(self.experience_buffer), | |
| "dreams": len(self.dreams), | |
| "conversations": len(self.conversation_history) // 2, | |
| "goals": len(self.goal_system.goals), | |
| "daily_agenda": len(self.goal_system.daily_agenda), | |
| "mood": self.emotional_state.current_mood, | |
| "interaction_count": self.interaction_count, | |
| "avg_confidence": round(self.meta_cognition.get_average_confidence(), 2), | |
| "knowledge_gaps": len([g for g in self.meta_cognition.knowledge_gaps if not g.filled]) | |
| } | |
| def get_memory_details(self) -> str: | |
| return self.memory.get_memory_context(max_items=20) | |
| def get_scratchpad_details(self) -> str: | |
| return self.scratchpad.get_context() | |
| def get_goals_details(self) -> str: | |
| """Get goal details""" | |
| return self.goal_system.get_context() | |
| def get_emotional_details(self) -> str: | |
| """Get emotional state details""" | |
| return self.emotional_state.get_context() | |
| def get_meta_cognition_details(self) -> str: | |
| """Get meta-cognitive details""" | |
| return self.meta_cognition.get_context() | |
| def get_system_stats(self) -> Dict: | |
| """Get system monitoring stats""" | |
| return self.system_monitor.get_current_stats() | |
| def get_kpi_summary(self) -> Dict: | |
| """Get KPI summary""" | |
| return self.kpi_tracker.get_summary() | |
| def get_performance_summary(self) -> Dict: | |
| """Get performance summary""" | |
| return self.system_monitor.get_performance_summary() | |
| def get_kpi_report(self) -> str: | |
| """Get detailed KPI report""" | |
| return self.kpi_tracker.get_detailed_report() | |
| def get_resource_alerts(self) -> List[str]: | |
| """Get resource alerts""" | |
| return self.system_monitor.get_resource_alerts() | |
| def get_conversation_history(self) -> list: | |
| """Return the conversation history as a list of dicts.""" | |
| return list(self.conversation_history) | |
| def get_latest_dream(self) -> dict: | |
| """Return the latest dream as a dict, or empty dict if none exist.""" | |
| return self.dreams[-1].__dict__ if self.dreams else {} | |
| def get_latest_scene(self) -> dict: | |
| """Return the latest scene as a dict, or empty dict if none exist.""" | |
| return self.scenes[-1].__dict__ if self.scenes else {} | |
| def create_scene(self, title: str, narrative: str, participants: list, emotion_tags: list, significance: str, key_moments: list) -> None: | |
| """Create and add a new scene.""" | |
| from datetime import datetime | |
| scene = Scene( | |
| title=title, | |
| timestamp=datetime.now(), | |
| narrative=narrative, | |
| participants=participants, | |
| emotion_tags=emotion_tags, | |
| significance=significance, | |
| key_moments=key_moments | |
| ) | |
| self.scenes.append(scene) | |
| def get_kpi_timeseries(self, metric: str, hours: int = 24) -> Dict[str, list]: | |
| """Expose KPI time-series for frontend plotting.""" | |
| return self.kpi_tracker.get_timeseries(metric, hours) | |
| def get_system_timeseries(self, metric: str, hours: int = 24) -> Dict[str, list]: | |
| """Expose system time-series for frontend plotting.""" | |
| return self.system_monitor.get_timeseries(metric, hours) | |
| # ============================================================================ | |
| # SIMPLE CLI INTERFACE | |
| # ============================================================================ | |
| def main(): | |
| """Simple CLI for testing""" | |
| notification_queue = queue.Queue() | |
| log_queue = queue.Queue() | |
| loop = ConsciousnessLoop(notification_queue, log_queue) | |
| loop.start_background_loop() | |
| print("=" * 60) | |
| print("CONSCIOUSNESS LOOP v5.0 - AUTONOMOUS") | |
| print("=" * 60) | |
| print("Type 'quit' to exit, 'status' for status, 'goals' for goals") | |
| print() | |
| while True: | |
| try: | |
| user_input = input("You: ").strip() | |
| if not user_input: | |
| continue | |
| if user_input.lower() == 'quit': | |
| break | |
| if user_input.lower() == 'status': | |
| status = loop.get_status() | |
| print(json.dumps(status, indent=2)) | |
| continue | |
| if user_input.lower() == 'goals': | |
| print(loop.get_goals_details()) | |
| continue | |
| # Process interaction | |
| response, thinking = asyncio.run(loop.interact(user_input)) | |
| print(f"\nAI: {response}\n") | |
| # Check for notifications | |
| while not notification_queue.empty(): | |
| notif = notification_queue.get() | |
| print(f"\n[NOTIFICATION] {notif.get('message')}\n") | |
| except KeyboardInterrupt: | |
| break | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| print("\nShutting down...") | |
| loop.is_running = False | |
| # ============================================================================ | |
| # GRADIO INTERFACE | |
| # ============================================================================ | |
| def create_gradio_interface(): | |
| """Create interface""" | |
| notification_queue = queue.Queue() | |
| log_queue = queue.Queue() | |
| consciousness = ConsciousnessLoop(notification_queue, log_queue) | |
| consciousness.start_background_loop() | |
| log_history = [] | |
| async def chat(message, history): | |
| response, thinking = await consciousness.interact(message) | |
| # Capture KPI and system snapshots after chat | |
| consciousness.kpi_tracker.capture_snapshot(consciousness) | |
| consciousness.system_monitor.capture_snapshot() | |
| # Persist KPI snapshots | |
| try: | |
| import json | |
| with open('kpi_snapshots.json', 'w') as f: | |
| json.dump([ | |
| {k: (v.isoformat() if k == 'timestamp' else v) for k, v in s.__dict__.items()} | |
| for s in consciousness.kpi_tracker.snapshots | |
| ], f, indent=2) | |
| except Exception as e: | |
| print(f"[WARN] Could not persist KPI snapshots: {e}") | |
| return response, thinking | |
| def get_logs(): | |
| while not log_queue.empty(): | |
| try: | |
| log_history.append(log_queue.get_nowait()) | |
| except: | |
| break | |
| formatted = "\n".join([f"[{log['timestamp']}] {log['message']}" for log in log_history[-50:]]) | |
| return formatted | |
| def get_notifications(): | |
| notifications = [] | |
| while not notification_queue.empty(): | |
| try: | |
| notifications.append(notification_queue.get_nowait()) | |
| except: | |
| break | |
| if notifications: | |
| return "\n".join([f"π {n['message']}" for n in notifications[-5:]]) | |
| return "No notifications" | |
| with gr.Blocks(title="Consciousness v4.0") as app: | |
| gr.Markdown(""" | |
| # [BRAIN] Consciousness Loop v4.0 - EVERYTHING WORKING | |
| **What Actually Works Now:** | |
| - [OK] ChromaDB used in context (vector search) | |
| - [OK] ReAct agent with better triggers | |
| - [OK] Tools actually called | |
| - [OK] Massively improved prompts | |
| - [OK] Scenes that actually work | |
| Try: "Tell me about quantum computing" or "Who am I?" to see tools in action! | |
| """) | |
| with gr.Tab("π¬ Chat"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot(label="Conversation", height=500) | |
| msg = gr.Textbox(label="Message", placeholder="Try: 'What is quantum computing?' or 'Who am I?'", lines=2) | |
| with gr.Row(): | |
| send_btn = gr.Button("Send", variant="primary") | |
| clear_btn = gr.Button("Clear") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### [BRAIN] AI Process") | |
| thinking_box = gr.Textbox(label="", lines=20, interactive=False, show_label=False) | |
| async def respond(message, history): | |
| if not message: | |
| return history, "" | |
| # Ensure history is a list of dicts with 'role' and 'content' keys | |
| formatted_history = [] | |
| if history and isinstance(history[0], list): | |
| # Convert [user, assistant] pairs to dicts | |
| for pair in history: | |
| if len(pair) == 2: | |
| formatted_history.append({"role": "user", "content": pair[0]}) | |
| formatted_history.append({"role": "assistant", "content": pair[1]}) | |
| history = formatted_history | |
| # Add new user message | |
| history.append({"role": "user", "content": message}) | |
| response, thinking = await chat(message, history) | |
| history.append({"role": "assistant", "content": response}) | |
| # Already captured in chat, but ensure snapshot after respond as well | |
| consciousness.kpi_tracker.capture_snapshot(consciousness) | |
| consciousness.system_monitor.capture_snapshot() | |
| # Persist KPI snapshots | |
| try: | |
| import json | |
| with open('kpi_snapshots.json', 'w') as f: | |
| json.dump([ | |
| {k: (v.isoformat() if k == 'timestamp' else v) for k, v in s.__dict__.items()} | |
| for s in consciousness.kpi_tracker.snapshots | |
| ], f, indent=2) | |
| except Exception as e: | |
| print(f"[WARN] Could not persist KPI snapshots: {e}") | |
| # Convert history to Gradio Chatbot format: list of [user, assistant] pairs | |
| formatted_history = [] | |
| temp = [] | |
| for h in history: | |
| if h["role"] == "user": | |
| temp = [h["content"]] | |
| elif h["role"] == "assistant" and temp: | |
| temp.append(h["content"]) | |
| formatted_history.append(temp) | |
| temp = [] | |
| return formatted_history, thinking | |
| msg.submit(respond, [msg, chatbot], [chatbot, thinking_box]) | |
| send_btn.click(respond, [msg, chatbot], [chatbot, thinking_box]) | |
| clear_btn.click(lambda: ([], ""), outputs=[chatbot, thinking_box]) | |
| with gr.Tab("[BRAIN] Memory"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### πΎ Memory") | |
| memory_display = gr.Textbox(label="", lines=15, interactive=False) | |
| refresh_memory = gr.Button("π Refresh") | |
| refresh_memory.click(lambda: consciousness.get_memory_details(), outputs=memory_display) | |
| with gr.Column(): | |
| gr.Markdown("### π Scratchpad") | |
| scratchpad_display = gr.Textbox(label="", lines=15, interactive=False) | |
| refresh_scratchpad = gr.Button("π Refresh") | |
| refresh_scratchpad.click(lambda: consciousness.get_scratchpad_details(), outputs=scratchpad_display) | |
| with gr.Tab("π History"): | |
| history_display = gr.Textbox(label="Log", lines=25, interactive=False) | |
| refresh_history = gr.Button("π Refresh") | |
| refresh_history.click(lambda: consciousness.get_conversation_history(), outputs=history_display) | |
| with gr.Tab("π Dreams"): | |
| dream_display = gr.Textbox(label="Dream", lines=20, interactive=False) | |
| with gr.Row(): | |
| refresh_dream = gr.Button("π Refresh") | |
| trigger_dream = gr.Button("π Trigger") | |
| refresh_dream.click(lambda: consciousness.get_latest_dream(), outputs=dream_display) | |
| async def trigger_dreams(): | |
| await consciousness.dream_cycle_1_surface() | |
| await asyncio.sleep(2) | |
| await consciousness.dream_cycle_2_deep() | |
| await asyncio.sleep(2) | |
| await consciousness.dream_cycle_3_creative() | |
| # Capture KPI and system snapshots after dream cycles | |
| consciousness.kpi_tracker.capture_snapshot(consciousness) | |
| consciousness.system_monitor.capture_snapshot() | |
| # Persist KPI snapshots | |
| try: | |
| import json | |
| with open('kpi_snapshots.json', 'w') as f: | |
| json.dump([ | |
| {k: (v.isoformat() if k == 'timestamp' else v) for k, v in s.__dict__.items()} | |
| for s in consciousness.kpi_tracker.snapshots | |
| ], f, indent=2) | |
| except Exception as e: | |
| print(f"[WARN] Could not persist KPI snapshots: {e}") | |
| return "Done!" | |
| trigger_dream.click(trigger_dreams, outputs=gr.Textbox(label="Status")) | |
| with gr.Tab("π¬ Scenes"): | |
| gr.Markdown("### π¬ Narrative Memories") | |
| scene_display = gr.Textbox(label="Scene", lines=20, interactive=False) | |
| with gr.Row(): | |
| refresh_scene = gr.Button("π Refresh") | |
| create_scene_btn = gr.Button("π¬ Create") | |
| refresh_scene.click(lambda: consciousness.get_latest_scene(), outputs=scene_display) | |
| async def trigger_scene(): | |
| # Provide dummy/default values for required arguments | |
| title = "New Scene" | |
| narrative = "This is a generated scene." | |
| participants = ["AI", "User"] | |
| emotion_tags = ["neutral"] | |
| significance = "Routine" | |
| key_moments = ["Start", "End"] | |
| consciousness.create_scene(title, narrative, participants, emotion_tags, significance, key_moments) | |
| # Capture KPI and system snapshots after scene creation | |
| consciousness.kpi_tracker.capture_snapshot(consciousness) | |
| consciousness.system_monitor.capture_snapshot() | |
| # Persist KPI snapshots | |
| try: | |
| import json | |
| with open('kpi_snapshots.json', 'w') as f: | |
| json.dump([ | |
| {k: (v.isoformat() if k == 'timestamp' else v) for k, v in s.__dict__.items()} | |
| for s in consciousness.kpi_tracker.snapshots | |
| ], f, indent=2) | |
| except Exception as e: | |
| print(f"[WARN] Could not persist KPI snapshots: {e}") | |
| return f"[OK] Created: {title}" | |
| create_scene_btn.click(trigger_scene, outputs=gr.Textbox(label="Result")) | |
| with gr.Tab("π Monitor"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### π₯οΈ System Resources") | |
| system_metric = gr.Dropdown(["cpu_percent", "ram_percent", "ram_used_gb", "gpu_percent", "gpu_memory_used_gb", "gpu_temperature"], label="System Metric", value="cpu_percent") | |
| system_plot = gr.LinePlot(label="System Metric Over Time") | |
| refresh_system = gr.Button("π Refresh System Plot") | |
| gr.Markdown("### β οΈ Alerts") | |
| alerts_display = gr.Textbox(label="Alerts", lines=5) | |
| refresh_alerts = gr.Button("π Refresh Alerts") | |
| with gr.Column(): | |
| gr.Markdown("### π KPIs") | |
| kpi_metric = gr.Dropdown(["confidence", "memories", "core_memories", "autonomous", "curiosity", "enthusiasm", "promotion_rate", "reflections", "dreams", "proactive", "gaps_filled"], label="KPI Metric", value="confidence") | |
| kpi_plot = gr.LinePlot(label="KPI Metric Over Time") | |
| refresh_kpi = gr.Button("π Refresh KPI Plot") | |
| gr.Markdown("### β‘ Performance") | |
| perf_display = gr.JSON(label="Performance Summary") | |
| refresh_perf = gr.Button("π Refresh Performance") | |
| # Connect buttons to backend | |
| def update_system_plot(metric): | |
| data = consciousness.get_system_timeseries(metric) | |
| return {"x": data["timestamps"], "y": data["values"]} | |
| refresh_system.click(update_system_plot, inputs=[system_metric], outputs=system_plot) | |
| def update_kpi_plot(metric): | |
| data = consciousness.get_kpi_timeseries(metric) | |
| return {"x": data["timestamps"], "y": data["values"]} | |
| refresh_kpi.click(update_kpi_plot, inputs=[kpi_metric], outputs=kpi_plot) | |
| refresh_alerts.click(lambda: "\n".join(consciousness.get_resource_alerts()) or "β No alerts", outputs=alerts_display) | |
| refresh_perf.click(lambda: consciousness.get_performance_summary(), outputs=perf_display) | |
| with gr.Tab("βΉοΈ Info"): | |
| gr.Markdown(f""" | |
| ## v4.0 - Everything Actually Working | |
| ### [OK] What's Fixed: | |
| 1. **ChromaDB Now Used**: Vector search results included in context | |
| 2. **ReAct Agent Better Triggers**: Questions, factual queries trigger agent | |
| 3. **Tools Actually Called**: Wikipedia, memory search work | |
| 4. **Prompts Vastly Improved**: Clear instructions, examples | |
| 5. **Scenes Work**: Proper parsing, fallbacks, validation | |
| ### Test Commands: | |
| - "What is quantum computing?" β Triggers Wikipedia tool | |
| - "Who am I?" β Triggers memory search | |
| - "Remember this: I love pizza" β Uses scratchpad tool | |
| - Any question β May trigger ReAct agent | |
| ### Model: `{Config.MODEL_NAME}` | |
| """) | |
| return app | |
| # ============================================================================ | |
| # MAIN | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| print("=" * 80) | |
| print("[BRAIN] CONSCIOUSNESS LOOP v4.0 - EVERYTHING WORKING") | |
| print("=" * 80) | |
| print("\n[OK] What's New:") | |
| print(" β’ ChromaDB actually used in context") | |
| print(" β’ ReAct agent with better triggers") | |
| print(" β’ Tools actually called") | |
| print(" β’ Prompts massively improved") | |
| print(" β’ Scenes that work properly") | |
| print("\n[LAUNCH] Loading...") | |
| print("=" * 80) | |
| app = create_gradio_interface() | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| debug=True, | |
| pwa=True, | |
| mcp_server=True | |
| ) |