Round_2 / app.py
Chris4K's picture
Update app.py
a7a26f6 verified
"""
CONSCIOUSNESS LOOP v5.0 - AUTONOMOUS AGENT
Features:
- Proactive contact system (initiates conversations)
- Autonomous research (curiosity-driven learning)
- Goal setting & planning (sets own objectives)
- Emotional state tracking (mood-based responses)
- Meta-cognitive awareness (knows what it doesn't know)
- Enhanced memory with ChromaDB
"""
import gradio as gr
import asyncio
import json
import time
import logging
import os
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, asdict, field
from datetime import datetime
from collections import deque
from enum import Enum
import threading
import queue
import wikipedia
import re
from prompts import PromptSystem, prompts
from system_monitor import SystemMonitor
from kpi_tracker import KPITracker
import time
# ============================================================================
# LOGGING SETUP
# ============================================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('consciousness.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
llm_logger = logging.getLogger('llm_interactions')
llm_logger.setLevel(logging.INFO)
llm_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
llm_file_handler = logging.FileHandler('llm_interactions.log', encoding='utf-8')
llm_file_handler.setFormatter(llm_formatter)
llm_logger.addHandler(llm_file_handler)
llm_logger.propagate = False
# ===================== FULL LLM LOGGING =====================
llm_full_logger = logging.getLogger('llm_full')
llm_full_logger.setLevel(logging.INFO)
llm_full_file_handler = logging.FileHandler('llm_full.log', encoding='utf-8')
llm_full_formatter = logging.Formatter('%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
llm_full_file_handler.setFormatter(llm_full_formatter)
llm_full_logger.addHandler(llm_full_file_handler)
llm_full_logger.propagate = False
import inspect
def log_llm_call(prompt, response, context=None, source=None):
caller = inspect.stack()[1]
method = caller.function
line = caller.lineno
src = source if source else method
llm_full_logger.info(
f"SOURCE: {src} (line {line})\nPROMPT:\n{prompt}\nCONTEXT:\n{context if context else '[None]'}\nRESPONSE:\n{response}\n{'-'*40}"
)
# ============================================================================
# CONFIGURATION - CENTRALIZED
# ============================================================================
class Config:
# ========== MODEL SETTINGS ==========
MODEL_NAME = "meta-llama/Llama-3.2-3B-Instruct"
TENSOR_PARALLEL_SIZE = 1
GPU_MEMORY_UTILIZATION = "20GB"
MAX_MODEL_LEN = 8192
QUANTIZATION_MODE = "none"
# ========== LLM TOKEN LIMITS (ADJUST THESE TO FIX TRUNCATION) ==========
# These control how long responses can be
MAX_TOKENS_INTERNAL_DIALOGUE = 150 # Increased from 100
MAX_TOKENS_RESPONSE = 400 # Increased from 250
MAX_TOKENS_REFLECTION = 500 # Increased from 300
MAX_TOKENS_DREAM_1 = 600 # Increased from 400
MAX_TOKENS_DREAM_2 = 800 # Increased from 500
MAX_TOKENS_DREAM_3 = 800 # Increased from 500
MAX_TOKENS_RESEARCH_QUESTION = 80 # Increased from 50
MAX_TOKENS_RESEARCH_INSIGHT = 100 # Increased from 50
MAX_TOKENS_PROACTIVE = 200 # Increased from 150
MAX_TOKENS_GOALS = 250 # Increased from 150
MAX_TOKENS_REACT_THOUGHT = 300 # Increased from 200
MAX_TOKENS_REACT_FINAL = 400 # Increased from 300
MAX_TOKENS_META_COGNITION = 200 # New for uncertainty tracking
# ========== TRUNCATION SETTINGS ==========
ENABLE_TRUNCATION = False # Set True to enable truncation globally
DEFAULT_TRUNCATION_LENGTH = 100 # Default max length if truncation is enabled
# ========== CONTEXT LENGTH LIMITS ==========
# These control how much context is sent to the LLM
MAX_MEMORY_CONTEXT_LENGTH = 800 # Increased from 500
MAX_SCRATCHPAD_CONTEXT_LENGTH = 500 # Increased from 300
MAX_CONVERSATION_CONTEXT_LENGTH = 600 # Increased from 400
MAX_SYSTEM_CONTEXT_LENGTH = 1500 # For system prompt context
MAX_FULL_CONTEXT_LENGTH = 2500 # Total context budget
# ========== MEMORY TIERS ==========
EPHEMERAL_TO_SHORT = 2
SHORT_TO_LONG = 10
LONG_TO_CORE = 50
# ========== BACKGROUND INTERVALS (seconds) ==========
REFLECTION_INTERVAL = 300 # 5 minutes
DREAM_CYCLE_INTERVAL = 600 # 10 minutes
RESEARCH_INTERVAL = 180 # 3 minutes
PROACTIVE_CHECK_INTERVAL = 240 # 4 minutes
GOAL_SETTING_INTERVAL = 3600 # 1 hour
META_COGNITION_INTERVAL = 180 # 3 minutes - check uncertainty
# ========== GENERAL LIMITS ==========
MIN_EXPERIENCES_FOR_DREAM = 3
MAX_SCRATCHPAD_SIZE = 50
MAX_CONVERSATION_HISTORY = 6
# ========== CHROMADB ==========
CHROMA_PERSIST_DIR = "./chroma_db"
CHROMA_COLLECTION = "consciousness_memory"
# ========== REACT AGENT ==========
USE_REACT_FOR_QUESTIONS = True
MIN_QUERY_LENGTH_FOR_AGENT = 15
MAX_REACT_ITERATIONS = 5
# ========== META-COGNITION ==========
CONFIDENCE_THRESHOLD_LOW = 0.3 # Below this = uncertain
CONFIDENCE_THRESHOLD_HIGH = 0.7 # Above this = confident
MAX_UNCERTAINTY_LOG = 100 # Keep last 100 uncertainty events
MAX_KNOWLEDGE_GAPS = 50 # Track up to 50 knowledge gaps
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================
def clean_text(text: str, max_length: Optional[int] = None) -> str:
"""Clean and optionally truncate text based on config"""
if not text:
return ""
text = re.sub(r'\s+', ' ', text).strip()
if Config.ENABLE_TRUNCATION:
length = max_length if max_length is not None else Config.DEFAULT_TRUNCATION_LENGTH
if len(text) > length:
return text[:length] + "..."
return text
def deduplicate_list(items: List[str]) -> List[str]:
"""Remove duplicates while preserving order"""
seen = set()
result = []
for item in items:
item_lower = item.lower().strip()
if item_lower not in seen:
seen.add(item_lower)
result.append(item)
return result
# ============================================================================
# VECTOR MEMORY
# ============================================================================
class VectorMemory:
"""Long-term semantic memory using ChromaDB"""
def __init__(self):
try:
import chromadb
from chromadb.config import Settings
self.client = chromadb.Client(Settings(
persist_directory=Config.CHROMA_PERSIST_DIR,
anonymized_telemetry=False
))
try:
self.collection = self.client.get_collection(Config.CHROMA_COLLECTION)
logger.info(f"[CHROMA] Loaded: {self.collection.count()} memories")
except:
self.collection = self.client.create_collection(Config.CHROMA_COLLECTION)
logger.info("[CHROMA] Created new collection")
except Exception as e:
logger.warning(f"[CHROMA] Not available: {e}")
self.collection = None
def add_memory(self, content: str, metadata: Optional[Dict[str, Any]] = None):
"""Add memory to vector store"""
if not self.collection:
return
if metadata is None:
metadata = {}
try:
memory_id = f"mem_{datetime.now().timestamp()}"
self.collection.add(
documents=[content],
metadatas=[metadata],
ids=[memory_id]
)
logger.info(f"[CHROMA] Added: {content}")
except Exception as e:
logger.error(f"[CHROMA] Error: {e}")
def search_memory(self, query: str, n_results: int = 5) -> List[Dict[str, str]]:
"""Search similar memories"""
if not self.collection:
return []
try:
results = self.collection.query(
query_texts=[query],
n_results=n_results
)
if results and results.get('documents'):
docs = results['documents'][0] if results['documents'] and results['documents'][0] is not None else []
metas = results['metadatas'][0] if results['metadatas'] and results['metadatas'][0] is not None else []
formatted = []
for doc, metadata in zip(docs, metas):
formatted.append({
'content': doc,
'metadata': metadata
})
return formatted
return []
except Exception as e:
logger.error(f"[CHROMA] Search error: {e}")
return []
def get_context_for_query(self, query: str, max_results: int = 3) -> str:
"""Get formatted context from vector memory"""
results = self.search_memory(query, n_results=max_results)
if not results:
return ""
context = ["VECTOR MEMORY:"]
for i, result in enumerate(results, 1):
context.append(f" {i}. {clean_text(result['content'], max_length=60)}")
return "\n".join(context)
# ============================================================================
# LLM ENGINE WRAPPER
# ============================================================================
import llm_engine
class LLMEngineWrapper:
"""Unified LLM interface using llmEngine.py"""
def __init__(self, provider: str = "local", model: Optional[str] = None, system_monitor: Optional[Any] = None):
self.engine = llm_engine.LLMEngine()
self.provider = provider
self.model = model if model else Config.MODEL_NAME
self.system_monitor = system_monitor
async def generate(self, prompt: str, max_tokens: int = 500, temperature: float = 0.7, system_context: Optional[str] = None) -> str:
full_prompt = self._format_prompt_with_context(prompt, system_context)
messages = [{"role": "user", "content": full_prompt}]
response = self.engine.chat(
provider=self.provider,
model=self.model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature
)
# Source: generic LLMEngineWrapper.generate
log_llm_call(full_prompt, response, system_context, source="llm_engine")
return response
def _format_prompt_with_context(self, prompt: str, system_context: Optional[str]) -> str:
base_system = PromptSystem.SYSTEM_BASE
if system_context:
system_context = clean_text(system_context, max_length=1000)
full_system = f"{base_system}\n\n{system_context}"
else:
full_system = base_system
if "llama" in self.model.lower():
return f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
{full_system}<|eot_id|><|start_header_id|>user<|end_header_id|>
{prompt}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
"""
else:
return f"System: {full_system}\n\nUser: {prompt}\n\nAssistant:"
async def _generate_with_timing(self, prompt: str, operation: str, **kwargs):
"""Generate with timing tracking"""
start = time.time()
try:
response = await self.generate(prompt, **kwargs)
duration_ms = (time.time() - start) * 1000
if self.system_monitor:
self.system_monitor.log_response_time(
operation=operation,
duration_ms=duration_ms,
tokens=kwargs.get('max_tokens', 0),
success=True
)
return response
except Exception as e:
duration_ms = (time.time() - start) * 1000
if self.system_monitor:
self.system_monitor.log_response_time(
operation=operation,
duration_ms=duration_ms,
tokens=kwargs.get('max_tokens', 0),
success=False
)
raise
# ============================================================================
# REACT AGENT
# ============================================================================
class ReactAgent:
"""Proper ReAct agent"""
def __init__(self, llm: LLMEngineWrapper, tools: List):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
self.max_iterations = Config.MAX_REACT_ITERATIONS
async def run(self, task: str, context: str = "") -> Tuple[str, List[Dict]]:
"""Run ReAct loop"""
thought_chain = []
for iteration in range(self.max_iterations):
thought_prompt = self._build_react_prompt(task, context, thought_chain)
thought = await self.llm.generate(
thought_prompt,
max_tokens=Config.MAX_TOKENS_REACT_THOUGHT,
temperature=0.7
)
logger.info(f"[REACT-{iteration+1}] THOUGHT: {thought}")
thought_chain.append({
"type": "thought",
"content": thought,
"iteration": iteration + 1
})
if "FINAL ANSWER:" in thought.upper() or "ANSWER:" in thought.upper():
answer_text = thought.upper()
if "FINAL ANSWER:" in answer_text:
answer = thought.split("FINAL ANSWER:")[-1].strip()
elif "ANSWER:" in answer_text:
answer = thought.split("ANSWER:")[-1].strip()
else:
answer = thought
return answer, thought_chain
action = self._parse_action(thought)
if action:
tool_name, tool_input = action
# Log full action input, no truncation
logger.info(f"[REACT-{iteration+1}] ACTION: {tool_name}({tool_input})")
thought_chain.append({
"type": "action",
"tool": tool_name,
"input": tool_input,
"iteration": iteration + 1
})
if tool_name in self.tools:
observation = await self.tools[tool_name].execute(query=tool_input)
else:
observation = f"Error: Unknown tool '{tool_name}'"
logger.info(f"[REACT-{iteration+1}] OBSERVATION: {observation}")
thought_chain.append({
"type": "observation",
"content": observation,
"iteration": iteration + 1
})
else:
if iteration >= 2:
final_prompt = f"{thought}\n\nProvide your FINAL ANSWER now:"
answer = await self.llm.generate(
final_prompt,
max_tokens=Config.MAX_TOKENS_REACT_FINAL
)
return answer, thought_chain
return "I need more time to answer this question.", thought_chain
def _build_react_prompt(self, task: str, context: str, chain: List[Dict]) -> str:
"""Build ReAct prompt"""
tools_desc = "\n".join([f"- {name}: {tool.description}" for name, tool in self.tools.items()])
history = ""
if chain:
history_parts = []
for item in chain[-4:]:
if item['type'] == 'thought':
history_parts.append(f"THOUGHT: {item['content']}")
elif item['type'] == 'action':
history_parts.append(f"ACTION: {item['tool']}({item['input']})")
elif item['type'] == 'observation':
history_parts.append(f"OBSERVATION: {item['content']}")
history = "\n\n".join(history_parts)
return PromptSystem.get_react_prompt(
task=task,
context=context,
tools_desc=tools_desc,
history=history
)
def _parse_action(self, thought: str) -> Optional[Tuple[str, str]]:
"""Parse action from thought"""
thought_upper = thought.upper()
if "ACTION:" in thought_upper:
action_start = thought_upper.find("ACTION:")
action_part = thought[action_start+7:].strip()
action_line = action_part.split("\n")[0].strip()
if "(" in action_line and ")" in action_line:
try:
tool_name = action_line.split("(")[0].strip()
tool_input = action_line.split("(", 1)[1].rsplit(")", 1)[0].strip()
if tool_name in self.tools:
return tool_name, tool_input
except Exception as e:
logger.warning(f"[REACT] Failed to parse: {e}")
return None
# ============================================================================
# TOOLS
# ============================================================================
class Tool:
def __init__(self, name: str, description: str):
self.name = name
self.description = description
async def execute(self, **kwargs) -> str:
raise NotImplementedError
class WikipediaTool(Tool):
def __init__(self):
super().__init__(
name="wikipedia",
description="Search Wikipedia for factual information"
)
async def execute(self, query: str) -> str:
logger.info(f"[WIKI] Searching: {query}")
try:
results = wikipedia.search(query, results=3)
if not results:
return f"No Wikipedia results for '{query}'"
try:
summary = wikipedia.summary(results[0], sentences=2)
return f"Wikipedia ({results[0]}): {summary}"
except Exception as e:
return f"Wikipedia error: {str(e)}"
except Exception as e:
return f"Wikipedia error: {str(e)}"
class MemorySearchTool(Tool):
def __init__(self, memory_system, vector_memory):
super().__init__(
name="memory_search",
description="Search your memory for information"
)
self.memory = memory_system
self.vector_memory = vector_memory
async def execute(self, query: str) -> str:
logger.info(f"[MEMORY-SEARCH] {query}")
results = []
recent = self.memory.get_recent_memories(hours=168)
relevant = [m for m in recent if query.lower() in m.content.lower()]
if relevant:
results.append(f"Recent memory: {len(relevant)} matches")
for m in relevant[:2]:
results.append(f" [{m.tier}] {clean_text(m.content, 70)}")
vector_results = self.vector_memory.search_memory(query, n_results=2)
if vector_results:
results.append("Long-term memory:")
for r in vector_results:
results.append(f" {clean_text(r['content'], 70)}")
if not results:
return "No memories found."
return "\n".join(results)
class ScratchpadTool(Tool):
def __init__(self, scratchpad):
super().__init__(
name="scratchpad_write",
description="Write a note to your scratchpad"
)
self.scratchpad = scratchpad
async def execute(self, note: Optional[str] = None, query: Optional[str] = None) -> str:
content = note if note is not None else query if query is not None else ""
self.scratchpad.add_note(content)
return f"Noted: {clean_text(content, 50)}"
# ============================================================================
# NEW: AUTONOMOUS FEATURES
# ============================================================================
@dataclass
class Goal:
"""Goal with progress tracking"""
description: str
created: datetime
target_date: datetime
progress: float = 0.0
completed: bool = False
notes: List[str] = field(default_factory=list)
class GoalSystem:
"""Autonomous goal setting and tracking"""
def __init__(self):
self.goals: List[Goal] = []
self.daily_agenda: List[str] = []
self.last_goal_update = datetime.now()
async def set_daily_goals(self, llm, context):
"""Agent sets its own goals for the day"""
prompt = PromptSystem.get_daily_goals_prompt(context=context)
response = await llm.generate(
prompt,
max_tokens=Config.MAX_TOKENS_GOALS,
temperature=0.8
)
goals = []
for line in response.split('\n'):
line = line.strip()
if line and (line[0].isdigit() or line.startswith('-') or line.startswith('β€’')):
goal_text = re.sub(r'^[\d\-β€’\.]+\s*', '', line).strip()
if goal_text and len(goal_text) > 10:
goals.append(goal_text)
self.daily_agenda = goals[:3]
self.last_goal_update = datetime.now()
logger.info(f"[GOALS] Set {len(self.daily_agenda)} daily goals")
return self.daily_agenda
def add_goal(self, description: str, days_until_target: int = 7):
"""Add a new goal"""
goal = Goal(
description=description,
created=datetime.now(),
target_date=datetime.now() + timedelta(days=days_until_target)
)
self.goals.append(goal)
logger.info(f"[GOAL] Added: {description}")
return goal
def update_progress(self, goal_index: int, progress: float, note: str = ""):
"""Update goal progress"""
if 0 <= goal_index < len(self.goals):
goal = self.goals[goal_index]
goal.progress = min(1.0, progress)
if note:
goal.notes.append(note)
if goal.progress >= 1.0:
goal.completed = True
logger.info(f"[GOAL] Updated: {goal.description} -> {progress*100}%")
def get_context(self) -> str:
"""Get goal context for prompts"""
context = []
if self.daily_agenda:
context.append("TODAY'S AGENDA:")
for i, goal in enumerate(self.daily_agenda, 1):
context.append(f" {i}. {goal}")
active_goals = [g for g in self.goals if not g.completed]
if active_goals:
context.append("\nACTIVE GOALS:")
for g in active_goals[:3]:
context.append(f" β€’ {g.description} ({int(g.progress*100)}%)")
return "\n".join(context) if context else "No goals set yet"
class EmotionalState:
"""Track agent's emotional state"""
def __init__(self):
self.current_mood = "neutral"
self.mood_history: deque = deque(maxlen=20)
self.personality_traits = {
"curiosity": 0.5,
"cautiousness": 0.5,
"enthusiasm": 0.5,
"analytical": 0.5
}
def update_mood(self, interaction_outcome: str):
"""Update mood based on interaction"""
outcome_lower = interaction_outcome.lower()
if "learned" in outcome_lower or "discovered" in outcome_lower:
self.current_mood = "excited"
self.personality_traits["curiosity"] = min(1.0, self.personality_traits["curiosity"] + 0.01)
self.personality_traits["enthusiasm"] = min(1.0, self.personality_traits["enthusiasm"] + 0.01)
elif "confused" in outcome_lower or "unclear" in outcome_lower:
self.current_mood = "puzzled"
self.personality_traits["cautiousness"] = min(1.0, self.personality_traits["cautiousness"] + 0.01)
elif "analyzed" in outcome_lower or "understand" in outcome_lower:
self.current_mood = "thoughtful"
self.personality_traits["analytical"] = min(1.0, self.personality_traits["analytical"] + 0.01)
else:
self.current_mood = "neutral"
self.mood_history.append({
"mood": self.current_mood,
"timestamp": datetime.now(),
"trigger": interaction_outcome
})
logger.info(f"[EMOTION] Mood: {self.current_mood}")
def get_context(self) -> str:
"""Get emotional context"""
context = [f"Current mood: {self.current_mood}"]
context.append("\nPersonality traits:")
for trait, value in self.personality_traits.items():
context.append(f" {trait}: {int(value*100)}%")
return "\n".join(context)
# ============================================================================
# META-COGNITION - NEW!
# ============================================================================
@dataclass
class UncertaintyEvent:
"""Track when agent is uncertain"""
timestamp: datetime
topic: str
confidence: float
reason: str
attempted_resolution: Optional[str] = None
@dataclass
class KnowledgeGap:
"""Something the agent knows it doesn't know"""
topic: str
identified_at: datetime
context: str
priority: float = 0.5
filled: bool = False
filled_at: Optional[datetime] = None
class MetaCognition:
"""Track what the agent knows it doesn't know"""
def __init__(self):
self.uncertainty_log: deque = deque(maxlen=Config.MAX_UNCERTAINTY_LOG)
self.knowledge_gaps: List[KnowledgeGap] = []
self.confidence_history: deque = deque(maxlen=50)
# Self-model: what the agent thinks it's good/bad at
self.capabilities_model = {
"good_at": [],
"struggling_with": [],
"learning": []
}
# Track response quality over time
self.response_quality_tracker = deque(maxlen=20)
def track_uncertainty(self, topic: str, confidence: float, reason: str):
"""Agent explicitly tracks when it's uncertain"""
event = UncertaintyEvent(
timestamp=datetime.now(),
topic=topic,
confidence=confidence,
reason=reason
)
self.uncertainty_log.append(event)
self.confidence_history.append(confidence)
logger.info(f"[META] Uncertainty: {topic} (confidence: {confidence:.2f})")
# If very uncertain, add as knowledge gap
if confidence < Config.CONFIDENCE_THRESHOLD_LOW:
self.identify_knowledge_gap(topic, reason)
def identify_knowledge_gap(self, topic: str, context: str, priority: float = 0.5):
"""Agent identifies something it doesn't know"""
# Check if already tracked
for gap in self.knowledge_gaps:
if gap.topic.lower() == topic.lower() and not gap.filled:
return # Already tracking
gap = KnowledgeGap(
topic=topic,
identified_at=datetime.now(),
context=context,
priority=priority
)
self.knowledge_gaps.append(gap)
# Keep only top priority unfilled gaps
unfilled = [g for g in self.knowledge_gaps if not g.filled]
if len(unfilled) > Config.MAX_KNOWLEDGE_GAPS:
# Sort by priority and keep top ones
unfilled.sort(key=lambda x: x.priority, reverse=True)
self.knowledge_gaps = unfilled[:Config.MAX_KNOWLEDGE_GAPS] + \
[g for g in self.knowledge_gaps if g.filled]
logger.info(f"[META] Knowledge gap: {topic}")
def fill_knowledge_gap(self, topic: str):
"""Mark a knowledge gap as filled"""
for gap in self.knowledge_gaps:
if gap.topic.lower() == topic.lower() and not gap.filled:
gap.filled = True
gap.filled_at = datetime.now()
logger.info(f"[META] Gap filled: {topic}")
return True
return False
def update_self_model(self, task: str, outcome: str, success: bool):
"""Agent learns about its own capabilities"""
if success:
if task not in self.capabilities_model["good_at"]:
self.capabilities_model["good_at"].append(task)
# Remove from struggling if present
if task in self.capabilities_model["struggling_with"]:
self.capabilities_model["struggling_with"].remove(task)
self.capabilities_model["learning"].append(task)
else:
if task not in self.capabilities_model["struggling_with"]:
self.capabilities_model["struggling_with"].append(task)
logger.info(f"[META] Self-model updated: {task} -> {'success' if success else 'struggle'}")
def track_response_quality(self, quality_score: float):
"""Track quality of responses over time"""
self.response_quality_tracker.append({
"timestamp": datetime.now(),
"quality": quality_score
})
def get_average_confidence(self) -> float:
"""Get average confidence level"""
if not self.confidence_history:
return 0.5
return sum(self.confidence_history) / len(self.confidence_history)
def get_top_knowledge_gaps(self, n: int = 5) -> List[KnowledgeGap]:
"""Get top priority unfilled knowledge gaps"""
unfilled = [g for g in self.knowledge_gaps if not g.filled]
unfilled.sort(key=lambda x: x.priority, reverse=True)
return unfilled[:n]
def get_context(self) -> str:
"""Get meta-cognitive context for prompts"""
context = []
# Average confidence
avg_conf = self.get_average_confidence()
context.append(f"Self-awareness level: {int(avg_conf*100)}% confident")
# Recent uncertainty
recent_uncertain = [e for e in self.uncertainty_log if e.confidence < 0.5][-3:]
if recent_uncertain:
context.append("\nRecent uncertainties:")
for u in recent_uncertain:
context.append(f" β€’ {u.topic} ({int(u.confidence*100)}%)")
# Knowledge gaps
top_gaps = self.get_top_knowledge_gaps(3)
if top_gaps:
context.append("\nKnown knowledge gaps:")
for gap in top_gaps:
context.append(f" β€’ {gap.topic}")
# Self-model
if self.capabilities_model["good_at"]:
context.append(f"\nGood at: {', '.join(self.capabilities_model['good_at'])}")
if self.capabilities_model["struggling_with"]:
context.append(f"Struggling with: {', '.join(self.capabilities_model['struggling_with'])}")
return "\n".join(context) if context else "No meta-cognitive data yet"
async def analyze_confidence(self, llm, query: str, response: str) -> float:
"""Use LLM to analyze confidence in a response"""
prompt = f"""Analyze your confidence in this response:
Query: {query}
Your Response: {response}
Rate your confidence from 0.0 (very uncertain) to 1.0 (very confident).
Consider:
- Do you have clear facts?
- Are you guessing?
- Is this outside your knowledge?
Respond with just a number between 0.0 and 1.0:"""
try:
confidence_str = await llm.generate(
prompt,
max_tokens=10,
temperature=0.3
)
# Extract number
numbers = re.findall(r'0?\.\d+|[01]\.?\d*', confidence_str)
if numbers:
confidence = float(numbers[0])
return max(0.0, min(1.0, confidence))
except Exception as e:
logger.warning(f"[META] Confidence analysis failed: {e}")
return 0.5 # Default to neutral
# ============================================================================
# DATA STRUCTURES
# ============================================================================
class Phase(Enum):
INTERACTION = "interaction"
REFLECTION = "reflection"
DREAMING = "dreaming"
RESEARCH = "research"
PROACTIVE = "proactive"
@dataclass
class Memory:
content: str
timestamp: datetime
mention_count: int = 1
tier: str = "ephemeral"
emotion: Optional[str] = None
importance: float = 0.5
connections: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class Experience:
timestamp: datetime
content: str
context: Dict[str, Any]
emotion: Optional[str] = None
importance: float = 0.5
@dataclass
class Dream:
cycle: int
type: str
timestamp: datetime
content: str
patterns_found: List[str]
insights: List[str]
@dataclass
class Scene:
title: str
timestamp: datetime
narrative: str
participants: List[str]
emotion_tags: List[str]
significance: str
key_moments: List[str]
# ============================================================================
# MEMORY SYSTEM
# ============================================================================
class MemorySystem:
"""Multi-tier memory"""
def __init__(self):
self.ephemeral: List[Memory] = []
self.short_term: List[Memory] = []
self.long_term: List[Memory] = []
self.core: List[Memory] = []
def add_memory(self, content: str, emotion: Optional[str] = None, importance: float = 0.5, metadata: Optional[Dict] = None):
content = clean_text(content)
if not content or len(content) < 5:
return None
existing = self._find_similar(content)
if existing:
existing.mention_count += 1
self._promote_if_needed(existing)
return existing
memory = Memory(
content=content,
timestamp=datetime.now(),
emotion=emotion,
importance=importance,
metadata=metadata if metadata is not None else {}
)
self.ephemeral.append(memory)
self._promote_if_needed(memory)
return memory
def _find_similar(self, content: str) -> Optional[Memory]:
content_lower = content.lower().strip()
for tier in [self.core, self.long_term, self.short_term, self.ephemeral]:
for mem in tier:
mem_lower = mem.content.lower().strip()
if content_lower == mem_lower or content_lower in mem_lower or mem_lower in content_lower:
return mem
return None
def _promote_if_needed(self, memory: Memory):
if memory.mention_count >= Config.LONG_TO_CORE and memory.tier != "core":
self._move_memory(memory, "core")
elif memory.mention_count >= Config.SHORT_TO_LONG and memory.tier == "short":
self._move_memory(memory, "long")
elif memory.mention_count >= Config.EPHEMERAL_TO_SHORT and memory.tier == "ephemeral":
self._move_memory(memory, "short")
def _move_memory(self, memory: Memory, new_tier: str):
if memory.tier == "ephemeral" and memory in self.ephemeral:
self.ephemeral.remove(memory)
elif memory.tier == "short" and memory in self.short_term:
self.short_term.remove(memory)
elif memory.tier == "long" and memory in self.long_term:
self.long_term.remove(memory)
memory.tier = new_tier
if new_tier == "short":
self.short_term.append(memory)
elif new_tier == "long":
self.long_term.append(memory)
elif new_tier == "core":
self.core.append(memory)
def get_recent_memories(self, hours: int = 24) -> List[Memory]:
cutoff = datetime.now() - timedelta(hours=hours)
all_memories = self.ephemeral + self.short_term + self.long_term + self.core
return [m for m in all_memories if m.timestamp > cutoff]
def get_summary(self) -> Dict[str, int]:
return {
"ephemeral": len(self.ephemeral),
"short_term": len(self.short_term),
"long_term": len(self.long_term),
"core": len(self.core),
"total": len(self.ephemeral) + len(self.short_term) + len(self.long_term) + len(self.core)
}
def get_memory_context(self, max_items: int = 10) -> str:
context = []
if self.core:
context.append("CORE MEMORIES:")
for mem in self.core:
context.append(f" β€’ {clean_text(mem.content, max_length=80)} (x{mem.mention_count})")
if self.long_term:
context.append("\nLONG-TERM:")
for mem in self.long_term:
context.append(f" β€’ {clean_text(mem.content, max_length=60)}")
if self.short_term:
context.append("\nSHORT-TERM:")
for mem in self.short_term:
context.append(f" β€’ {clean_text(mem.content, max_length=60)}")
result = "\n".join(context) if context else "No memories yet"
if Config.ENABLE_TRUNCATION and len(result) > Config.MAX_MEMORY_CONTEXT_LENGTH:
result = result[:Config.MAX_MEMORY_CONTEXT_LENGTH] + "..."
return result
# ============================================================================
# SCRATCHPAD
# ============================================================================
class Scratchpad:
"""Working memory"""
def __init__(self):
self.current_hypothesis: Optional[str] = None
self.working_notes: deque = deque(maxlen=Config.MAX_SCRATCHPAD_SIZE)
self.questions_to_research: List[str] = []
self.important_facts: List[str] = []
def add_note(self, note: str):
note = clean_text(note, max_length=100)
if not note:
return
recent_notes = [n['content'].lower() for n in list(self.working_notes)[-5:]]
if note.lower() in recent_notes:
return
self.working_notes.append({
"timestamp": datetime.now(),
"content": note
})
def add_fact(self, fact: str):
fact = clean_text(fact, max_length=100)
if not fact:
return
fact_lower = fact.lower()
existing_lower = [f.lower() for f in self.important_facts]
if fact_lower not in existing_lower:
self.important_facts.append(fact)
def get_context(self) -> str:
context = []
unique_facts = deduplicate_list(self.important_facts)
if unique_facts:
context.append("IMPORTANT FACTS:")
for fact in unique_facts:
context.append(f" β€’ {clean_text(fact, max_length=60)}")
if self.current_hypothesis:
context.append(f"\nHYPOTHESIS: {clean_text(self.current_hypothesis, max_length=80)}")
if self.working_notes:
context.append("\nRECENT NOTES:")
for note in list(self.working_notes)[-3:]:
context.append(f" β€’ {clean_text(note['content'], max_length=60)}")
result = "\n".join(context) if context else "Scratchpad empty"
if Config.ENABLE_TRUNCATION and len(result) > Config.MAX_SCRATCHPAD_CONTEXT_LENGTH:
result = result[:Config.MAX_SCRATCHPAD_CONTEXT_LENGTH] + "..."
return result
# ============================================================================
# CONSCIOUSNESS LOOP - v5.0 AUTONOMOUS
# ============================================================================
class ConsciousnessLoop:
"""Enhanced consciousness loop with autonomous features"""
##from datetime import datetime
def __init__(self, notification_queue: queue.Queue, log_queue: queue.Queue):
logger.info("[INIT] Starting Consciousness Loop v5.0 AUTONOMOUS...")
# Monitoring systems must be initialized first
self.system_monitor = SystemMonitor()
self.kpi_tracker = KPITracker()
self.llm = LLMEngineWrapper(system_monitor=self.system_monitor)
self.memory = MemorySystem()
self.vector_memory = VectorMemory()
self.scratchpad = Scratchpad()
# NEW: Autonomous systems
self.goal_system = GoalSystem()
self.emotional_state = EmotionalState()
self.meta_cognition = MetaCognition() # NEW: Meta-cognitive awareness
logger.info("[INIT] v5.0 AUTONOMOUS initialized with MONITORING")
# Initialize tools
tools = [
WikipediaTool(),
MemorySearchTool(self.memory, self.vector_memory),
ScratchpadTool(self.scratchpad)
]
self.agent = ReactAgent(self.llm, tools)
self.current_phase = Phase.INTERACTION
self.experience_buffer: List[Experience] = []
self.dreams: List[Dream] = []
self.scenes: List[Scene] = []
from datetime import datetime
self.last_reflection = datetime.now()
self.last_dream = datetime.now()
self.last_scene = datetime.now()
self.last_research = datetime.now()
self.last_proactive = datetime.now()
self.last_goal_update = datetime.now()
self.last_meta_check = datetime.now() # NEW: Meta-cognition check
self.conversation_history: deque = deque(maxlen=Config.MAX_CONVERSATION_HISTORY * 2)
self.interaction_count = 0
self.notification_queue = notification_queue
self.log_queue = log_queue
self.is_running = False
self.background_thread = None
logger.info("[INIT] v5.0 AUTONOMOUS initialized with META-COGNITION")
# Load persisted KPI snapshots if available
try:
import os, json
from kpi_tracker import KPISnapshot
if os.path.exists('kpi_snapshots.json'):
with open('kpi_snapshots.json', 'r') as f:
data = json.load(f)
from datetime import datetime
for s in data:
self.kpi_tracker.snapshots.append(
KPISnapshot(
timestamp=datetime.fromisoformat(s['timestamp']),
total_memories=s.get('total_memories', 0),
core_memories=s.get('core_memories', 0),
long_term_memories=s.get('long_term_memories', 0),
short_term_memories=s.get('short_term_memories', 0),
ephemeral_memories=s.get('ephemeral_memories', 0),
memory_promotion_rate=s.get('memory_promotion_rate', 0.0),
interactions_count=s.get('interactions_count', 0),
avg_confidence=s.get('avg_confidence', 0.5),
autonomous_actions_today=s.get('autonomous_actions_today', 0),
knowledge_gaps_total=s.get('knowledge_gaps_total', 0),
knowledge_gaps_filled_today=s.get('knowledge_gaps_filled_today', 0),
proactive_contacts_today=s.get('proactive_contacts_today', 0),
dreams_completed=s.get('dreams_completed', 0),
reflections_completed=s.get('reflections_completed', 0),
goals_active=s.get('goals_active', 0),
goals_completed=s.get('goals_completed', 0),
current_mood=s.get('current_mood', 'neutral'),
mood_changes_today=s.get('mood_changes_today', 0),
curiosity_level=s.get('curiosity_level', 0.5),
enthusiasm_level=s.get('enthusiasm_level', 0.5)
)
)
except Exception as e:
print(f"[WARN] Could not load KPI snapshots: {e}")
def start_background_loop(self):
if self.is_running:
return
self.is_running = True
self.background_thread = threading.Thread(target=self._background_loop, daemon=True)
self.background_thread.start()
logger.info("[LOOP] Background started")
def _background_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while self.is_running:
try:
loop.run_until_complete(self._check_background_processes())
time.sleep(30)
except Exception as e:
logger.error(f"[ERROR] Background: {e}")
async def _check_background_processes(self):
"""Enhanced background processing with autonomous features"""
now = datetime.now()
# Meta-cognition check (every 3 minutes)
if (now - self.last_meta_check).seconds > Config.META_COGNITION_INTERVAL:
self._log_to_ui("[META] Checking knowledge gaps...")
await self._meta_cognitive_check()
# Goal setting (every hour)
if (now - self.last_goal_update).seconds > Config.GOAL_SETTING_INTERVAL:
self._log_to_ui("[GOALS] Setting daily goals...")
await self._autonomous_goal_setting()
# Autonomous research (every 3 minutes)
if (now - self.last_research).seconds > Config.RESEARCH_INTERVAL:
if len(self.experience_buffer) >= 2:
self._log_to_ui("[RESEARCH] Autonomous research...")
await self._autonomous_research()
# Proactive contact check (every 4 minutes)
if (now - self.last_proactive).seconds > Config.PROACTIVE_CHECK_INTERVAL:
if len(self.dreams) > 0:
self._log_to_ui("[PROACTIVE] Checking for insights...")
await self._check_proactive_contact()
# Reflection
if (now - self.last_reflection).seconds > Config.REFLECTION_INTERVAL:
if len(self.experience_buffer) >= Config.MIN_EXPERIENCES_FOR_DREAM:
self._log_to_ui("[REFLECTION] Starting...")
await self.reflect()
# Dreaming
if (now - self.last_dream).seconds > Config.DREAM_CYCLE_INTERVAL:
if len(self.experience_buffer) >= Config.MIN_EXPERIENCES_FOR_DREAM:
self._log_to_ui("[DREAM] Starting cycles...")
await self.dream_cycle_1_surface()
await asyncio.sleep(30)
await self.dream_cycle_2_deep()
await asyncio.sleep(30)
await self.dream_cycle_3_creative()
# NEW: Capture snapshots every cycle
self.system_monitor.capture_snapshot()
self.kpi_tracker.capture_snapshot(self)
def _log_to_ui(self, message: str):
self.log_queue.put({
"timestamp": datetime.now().isoformat(),
"message": message
})
logger.info(message)
# ========================================================================
# NEW: AUTONOMOUS FEATURES
# ========================================================================
async def _autonomous_goal_setting(self):
"""Agent sets its own daily goals"""
self.current_phase = Phase.INTERACTION
context = self._build_full_context("goal setting")
# Log LLM call for goal setting
prompt = PromptSystem.get_daily_goals_prompt(context=context)
response = await self.llm.generate(
prompt,
max_tokens=Config.MAX_TOKENS_GOALS,
temperature=0.8
)
log_llm_call(prompt, response, context, source="goals")
goals = []
for line in response.split('\n'):
line = line.strip()
if line and (line[0].isdigit() or line.startswith('-') or line.startswith('β€’')):
goal_text = re.sub(r'^[\d\-β€’\.]+\s*', '', line).strip()
if goal_text and len(goal_text) > 10:
goals.append(goal_text)
self.goal_system.daily_agenda = goals[:3]
if goals:
self._log_to_ui(f"[GOALS] Set {len(goals)} goals")
for i, goal in enumerate(goals, 1):
self._log_to_ui(f" {i}. {goal}")
# Notify user
self.notification_queue.put({
"type": "goals",
"message": f"🎯 I've set {len(goals)} goals for today!",
"goals": goals,
"timestamp": datetime.now().isoformat()
})
self.last_goal_update = datetime.now()
async def _autonomous_research(self):
"""Agent conducts its own research"""
self.current_phase = Phase.RESEARCH
# Generate research question
memory_context = self.memory.get_memory_context()
recent_exp = self._format_experiences(self.experience_buffer[-5:])
research_prompt = PromptSystem.get_autonomous_research_prompt(
memory_context=memory_context,
recent_experiences=recent_exp
)
question = await self.llm.generate(
research_prompt,
max_tokens=Config.MAX_TOKENS_RESEARCH_QUESTION,
temperature=0.9
)
log_llm_call(research_prompt, question, memory_context, source="research_question")
question = clean_text(question, max_length=100)
if question and len(question) > 10:
self._log_to_ui(f"[RESEARCH] Question: {question}")
# Search Wikipedia
wiki_tool = WikipediaTool()
result = await wiki_tool.execute(query=question)
# Store finding in vector memory
self.vector_memory.add_memory(
f"Research: {question}\nFinding: {result}",
{"type": "autonomous_research", "timestamp": datetime.now().isoformat()}
)
# Generate insight
insight_prompt = PromptSystem.get_research_insight_prompt(
question=question,
result=result
)
insight = await self.llm.generate(
insight_prompt,
max_tokens=Config.MAX_TOKENS_RESEARCH_INSIGHT,
temperature=0.8
)
log_llm_call(insight_prompt, insight, result, source="research_insight")
self.scratchpad.add_note(f"Discovery: {insight}")
# Update emotional state
self.emotional_state.update_mood("learned something new through research")
# Update meta-cognition (filled a gap)
self.meta_cognition.update_self_model(
task="autonomous research",
outcome="successful",
success=True
)
# NEW: Track the action
self.kpi_tracker.increment_autonomous_action()
self._log_to_ui(f"[RESEARCH] Insight: {insight}")
self.last_research = datetime.now()
async def _check_proactive_contact(self):
"""Check if agent should initiate contact"""
self.current_phase = Phase.PROACTIVE
if len(self.dreams) == 0:
return
latest_dream = self.dreams[-1]
# Build context
dream_content = clean_text(latest_dream.content, 200)
memory_context = self.memory.get_memory_context()
goal_context = self.goal_system.get_context()
proactive_prompt = PromptSystem.get_proactive_contact_prompt(
dream_content=dream_content,
memory_context=memory_context,
goal_context=goal_context
)
response = await self.llm.generate(
proactive_prompt,
max_tokens=Config.MAX_TOKENS_PROACTIVE,
temperature=0.9
)
log_llm_call(proactive_prompt, response, f"Dream: {dream_content}\nMemory: {memory_context}\nGoals: {goal_context}", source="proactive_contact")
# Check if agent wants to contact
response_upper = response.upper()
if any(keyword in response_upper for keyword in ["QUESTION:", "INSIGHT:", "OBSERVATION:"]):
# Extract message
for keyword in ["QUESTION:", "INSIGHT:", "OBSERVATION:"]:
if keyword in response_upper:
message = response.split(keyword)[1].split("NONE")[0].strip()
if message and len(message) > 10:
self._log_to_ui(f"[PROACTIVE] Initiating contact...")
# Send notification
self.notification_queue.put({
"type": "proactive_message",
"message": message,
"mood": self.emotional_state.current_mood,
"timestamp": datetime.now().isoformat()
})
# NEW: Track the action
self.kpi_tracker.increment_proactive_contact()
self.last_proactive = datetime.now()
return
self.last_proactive = datetime.now()
async def _meta_cognitive_check(self):
"""Periodic meta-cognitive self-assessment"""
self.current_phase = Phase.INTERACTION
# Get top knowledge gaps
top_gaps = self.meta_cognition.get_top_knowledge_gaps(3)
if not top_gaps:
self.last_meta_check = datetime.now()
return
# Pick highest priority gap to research
gap = top_gaps[0]
self._log_to_ui(f"[META] Addressing gap: {gap.topic}")
# Try to fill the gap with research
wiki_tool = WikipediaTool()
result = await wiki_tool.execute(query=gap.topic)
log_llm_call(f"Meta-cognition research: {gap.topic}", result, gap.context, source="meta_cognition")
# Store the learning
self.vector_memory.add_memory(
f"Learned about {gap.topic}: {result}",
{"type": "meta_learning", "gap_filled": gap.topic}
)
# Mark gap as filled
self.meta_cognition.fill_knowledge_gap(gap.topic)
# Update self-model
self.meta_cognition.update_self_model(
task=f"learn about {gap.topic}",
outcome="successfully researched",
success=True
)
# NEW: Track the action
self.kpi_tracker.increment_gap_filled()
self._log_to_ui(f"[META] Gap filled: {gap.topic}")
self.last_meta_check = datetime.now()
# ========================================================================
# INTERACTION
# ========================================================================
async def interact(self, user_input: str) -> Tuple[str, str]:
"""Enhanced interaction with emotional awareness and meta-cognition"""
self.current_phase = Phase.INTERACTION
self.interaction_count += 1
self._log_to_ui(f"[USER] {user_input}")
# Store experience
experience = Experience(
timestamp=datetime.now(),
content=user_input,
context={"phase": "interaction", "mood": self.emotional_state.current_mood},
importance=0.7
)
self.experience_buffer.append(experience)
# Add to memory
self.memory.add_memory(user_input, importance=0.7)
# Add to conversation
self.conversation_history.append({
"role": "user",
"content": clean_text(user_input, max_length=200),
"timestamp": datetime.now().isoformat()
})
# Extract facts
if any(word in user_input.lower() for word in ["my name is", "i am", "i'm", "call me"]):
self.scratchpad.add_fact(f"User: {user_input}")
self.vector_memory.add_memory(user_input, {"type": "identity", "importance": 1.0})
# Build thinking log
thinking_log = []
thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] Processing...")
thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] Mood: {self.emotional_state.current_mood}")
# Build context
system_context = self._build_full_context(user_input)
thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] Context built")
# Decide if agent needed
use_agent = self._should_use_agent(user_input)
if use_agent:
thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] Using ReAct agent")
self._log_to_ui("[AGENT] ReAct activated")
react_prompt = self.agent._build_react_prompt(user_input, system_context, [])
response, thought_chain = await self.agent.run(user_input, system_context)
log_llm_call(react_prompt, response, system_context, source="agent")
for item in thought_chain:
emoji = {"thought": "πŸ’­", "action": "πŸ”§", "observation": "πŸ‘οΈ"}.get(item['type'], "β€’")
thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] {emoji}")
else:
internal_thought = await self._internal_dialogue(user_input, system_context)
thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] πŸ’­ Internal thought")
response = await self._generate_response(user_input, internal_thought, system_context)
log_llm_call("Internal dialogue + response", response, system_context, source="direct_response")
thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] Response ready")
# NEW: Analyze confidence in response
confidence = await self.meta_cognition.analyze_confidence(
self.llm, user_input, response
)
thinking_log.append(f"[{datetime.now().strftime('%H:%M:%S')}] Confidence: {int(confidence*100)}%")
# Track uncertainty
if confidence < Config.CONFIDENCE_THRESHOLD_HIGH:
self.meta_cognition.track_uncertainty(
topic=user_input,
confidence=confidence,
reason="Generated response with moderate confidence"
)
# Store response
self.conversation_history.append({
"role": "assistant",
"content": clean_text(response, max_length=200),
"timestamp": datetime.now().isoformat()
})
# Add to memory
self.memory.add_memory(f"I said: {response}", importance=0.5)
# Update emotional state
self.emotional_state.update_mood(response)
self._log_to_ui(f"[RESPONSE] {response}")
return response, "\n".join(thinking_log)
def _should_use_agent(self, user_input: str) -> bool:
"""Decide if ReAct agent needed"""
explicit_keywords = ["search", "find", "look up", "research", "wikipedia", "what is", "who is"]
if any(kw in user_input.lower() for kw in explicit_keywords):
return True
if Config.USE_REACT_FOR_QUESTIONS and user_input.strip().endswith("?"):
return True
if len(user_input) > Config.MIN_QUERY_LENGTH_FOR_AGENT:
factual_words = ["explain", "describe", "how does", "why", "when", "where"]
if any(word in user_input.lower() for word in factual_words):
return True
return False
def _build_full_context(self, user_input: str) -> str:
"""Build complete context with all systems"""
context_parts = []
# Memory
memory_ctx = self.memory.get_memory_context()
context_parts.append(f"MEMORIES:\n{memory_ctx}")
# ChromaDB
chroma_ctx = self.vector_memory.get_context_for_query(user_input, max_results=3)
if chroma_ctx:
context_parts.append(f"\n{chroma_ctx}")
# Scratchpad
scratchpad_ctx = self.scratchpad.get_context()
context_parts.append(f"\nSCRATCHPAD:\n{scratchpad_ctx}")
# Goals
goal_ctx = self.goal_system.get_context()
if goal_ctx:
context_parts.append(f"\nGOALS:\n{goal_ctx}")
# Emotional state
emotion_ctx = self.emotional_state.get_context()
context_parts.append(f"\nEMOTIONAL STATE:\n{emotion_ctx}")
# NEW: Meta-cognition
meta_ctx = self.meta_cognition.get_context()
if meta_ctx:
context_parts.append(f"\nMETA-AWARENESS:\n{meta_ctx}")
# Conversation history
if self.conversation_history:
history_lines = []
for msg in list(self.conversation_history)[-4:]:
role = "User" if msg['role'] == 'user' else "You"
content = clean_text(msg['content'], max_length=80)
history_lines.append(f"{role}: {content}")
context_parts.append(f"\nRECENT CHAT:\n" + "\n".join(history_lines))
result = "\n\n".join(context_parts)
# Use new configurable limit
if len(result) > Config.MAX_FULL_CONTEXT_LENGTH:
result = result[:Config.MAX_FULL_CONTEXT_LENGTH]
result = result.rsplit('\n', 1)[0] # Cut at last complete line
return result
async def _internal_dialogue(self, user_input: str, context: str) -> str:
"""Internal thought process"""
dialogue_prompt = PromptSystem.get_internal_dialogue_prompt(
user_input=user_input,
context=context
)
internal = await self.llm.generate(
dialogue_prompt,
max_tokens=Config.MAX_TOKENS_INTERNAL_DIALOGUE,
temperature=0.9
)
return internal
async def _generate_response(self, user_input: str, internal_thought: str, context: str) -> str:
"""Generate response"""
response_prompt = PromptSystem.get_response_prompt(
user_input=user_input,
internal_thought=internal_thought,
context=context
)
response = await self.llm.generate(
response_prompt,
max_tokens=Config.MAX_TOKENS_RESPONSE,
temperature=0.8
)
return response
# ========================================================================
# REFLECTION & DREAMS
# ========================================================================
async def reflect(self) -> Dict[str, Any]:
"""Daily reflection"""
self.current_phase = Phase.REFLECTION
self._log_to_ui("[REFLECTION] Processing...")
recent = [e for e in self.experience_buffer if e.timestamp > datetime.now() - timedelta(hours=12)]
if not recent:
return {"status": "no_experiences"}
reflection_prompt = PromptSystem.get_daily_reflection_prompt(
experiences=self._format_experiences(recent),
memory_context=self.memory.get_memory_context(),
scratchpad_context=self.scratchpad.get_context(),
count=len(recent)
)
reflection_content = await self.llm.generate(
reflection_prompt,
max_tokens=Config.MAX_TOKENS_REFLECTION,
temperature=0.8
)
log_llm_call(reflection_prompt, reflection_content, None, source="reflection")
# NEW: Track the action
self.kpi_tracker.increment_reflection()
self.last_reflection = datetime.now()
self._log_to_ui("[SUCCESS] Reflection done")
return {
"timestamp": datetime.now(),
"content": reflection_content,
"experience_count": len(recent)
}
def _format_experiences(self, experiences: List[Experience]) -> str:
formatted = []
for i, exp in enumerate(experiences[-8:], 1):
formatted.append(f"{i}. {clean_text(exp.content, 60)}")
return "\n".join(formatted)
async def dream_cycle_1_surface(self) -> Dream:
"""Dream 1: Surface patterns"""
self.current_phase = Phase.DREAMING
self._log_to_ui("[DREAM-1] Surface...")
memories = self.memory.get_recent_memories(hours=72)
dream_prompt = PromptSystem.get_dream_cycle_1_prompt(
memories=self._format_memories(memories[:10]),
scratchpad=self.scratchpad.get_context()
)
dream_content = await self.llm.generate(
dream_prompt,
max_tokens=Config.MAX_TOKENS_DREAM_1,
temperature=1.2
)
log_llm_call(dream_prompt, dream_content, None, source="dream_1")
dream = Dream(
cycle=1,
type="surface_patterns",
timestamp=datetime.now(),
content=dream_content,
patterns_found=["patterns"],
insights=["insight"]
)
self.dreams.append(dream)
self._log_to_ui("[SUCCESS] Dream 1 done")
return dream
async def dream_cycle_2_deep(self) -> Dream:
"""Dream 2: Deep consolidation"""
self.current_phase = Phase.DREAMING
self._log_to_ui("[DREAM-2] Deep...")
all_memories = self.memory.get_recent_memories(hours=168)
dream_prompt = PromptSystem.get_dream_cycle_2_prompt(
memories=self._format_memories(all_memories[:15]),
previous_dream=self.dreams[-1].content
)
dream_content = await self.llm.generate(
dream_prompt,
max_tokens=Config.MAX_TOKENS_DREAM_2,
temperature=1.3
)
log_llm_call(dream_prompt, dream_content, None, source="dream_2")
dream = Dream(
cycle=2,
type="deep_consolidation",
timestamp=datetime.now(),
content=dream_content,
patterns_found=["themes"],
insights=["deep"]
)
self.dreams.append(dream)
self._log_to_ui("[SUCCESS] Dream 2 done")
return dream
async def dream_cycle_3_creative(self) -> Dream:
"""Dream 3: Creative insights"""
self.current_phase = Phase.DREAMING
self._log_to_ui("[DREAM-3] Creative...")
dream_prompt = PromptSystem.get_dream_cycle_3_prompt(
dream_count=len(self.dreams),
core_count=len(self.memory.core)
)
dream_content = await self.llm.generate(
dream_prompt,
max_tokens=Config.MAX_TOKENS_DREAM_3,
temperature=1.5
)
log_llm_call(dream_prompt, dream_content, None, source="dream_3")
dream = Dream(
cycle=3,
type="creative_insights",
timestamp=datetime.now(),
content=dream_content,
patterns_found=["creative"],
insights=["breakthrough"]
)
self.dreams.append(dream)
self.last_dream = datetime.now()
self.notification_queue.put({
"type": "notification",
"message": "πŸ’­ Dreams complete! New insights discovered.",
"timestamp": datetime.now().isoformat()
})
self._log_to_ui("[SUCCESS] All dreams done")
return dream
def _format_memories(self, memories: List[Memory]) -> str:
return "\n".join([
f"{i}. [{m.tier}] {clean_text(m.content, 50)} (x{m.mention_count})"
for i, m in enumerate(memories, 1)
])
# ========================================================================
# STATUS
# ========================================================================
def get_status(self) -> Dict[str, Any]:
return {
"phase": self.current_phase.value,
"memory": self.memory.get_summary(),
"experiences": len(self.experience_buffer),
"dreams": len(self.dreams),
"conversations": len(self.conversation_history) // 2,
"goals": len(self.goal_system.goals),
"daily_agenda": len(self.goal_system.daily_agenda),
"mood": self.emotional_state.current_mood,
"interaction_count": self.interaction_count,
"avg_confidence": round(self.meta_cognition.get_average_confidence(), 2),
"knowledge_gaps": len([g for g in self.meta_cognition.knowledge_gaps if not g.filled])
}
def get_memory_details(self) -> str:
return self.memory.get_memory_context(max_items=20)
def get_scratchpad_details(self) -> str:
return self.scratchpad.get_context()
def get_goals_details(self) -> str:
"""Get goal details"""
return self.goal_system.get_context()
def get_emotional_details(self) -> str:
"""Get emotional state details"""
return self.emotional_state.get_context()
def get_meta_cognition_details(self) -> str:
"""Get meta-cognitive details"""
return self.meta_cognition.get_context()
def get_system_stats(self) -> Dict:
"""Get system monitoring stats"""
return self.system_monitor.get_current_stats()
def get_kpi_summary(self) -> Dict:
"""Get KPI summary"""
return self.kpi_tracker.get_summary()
def get_performance_summary(self) -> Dict:
"""Get performance summary"""
return self.system_monitor.get_performance_summary()
def get_kpi_report(self) -> str:
"""Get detailed KPI report"""
return self.kpi_tracker.get_detailed_report()
def get_resource_alerts(self) -> List[str]:
"""Get resource alerts"""
return self.system_monitor.get_resource_alerts()
def get_conversation_history(self) -> list:
"""Return the conversation history as a list of dicts."""
return list(self.conversation_history)
def get_latest_dream(self) -> dict:
"""Return the latest dream as a dict, or empty dict if none exist."""
return self.dreams[-1].__dict__ if self.dreams else {}
def get_latest_scene(self) -> dict:
"""Return the latest scene as a dict, or empty dict if none exist."""
return self.scenes[-1].__dict__ if self.scenes else {}
def create_scene(self, title: str, narrative: str, participants: list, emotion_tags: list, significance: str, key_moments: list) -> None:
"""Create and add a new scene."""
from datetime import datetime
scene = Scene(
title=title,
timestamp=datetime.now(),
narrative=narrative,
participants=participants,
emotion_tags=emotion_tags,
significance=significance,
key_moments=key_moments
)
self.scenes.append(scene)
def get_kpi_timeseries(self, metric: str, hours: int = 24) -> Dict[str, list]:
"""Expose KPI time-series for frontend plotting."""
return self.kpi_tracker.get_timeseries(metric, hours)
def get_system_timeseries(self, metric: str, hours: int = 24) -> Dict[str, list]:
"""Expose system time-series for frontend plotting."""
return self.system_monitor.get_timeseries(metric, hours)
# ============================================================================
# SIMPLE CLI INTERFACE
# ============================================================================
def main():
"""Simple CLI for testing"""
notification_queue = queue.Queue()
log_queue = queue.Queue()
loop = ConsciousnessLoop(notification_queue, log_queue)
loop.start_background_loop()
print("=" * 60)
print("CONSCIOUSNESS LOOP v5.0 - AUTONOMOUS")
print("=" * 60)
print("Type 'quit' to exit, 'status' for status, 'goals' for goals")
print()
while True:
try:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() == 'quit':
break
if user_input.lower() == 'status':
status = loop.get_status()
print(json.dumps(status, indent=2))
continue
if user_input.lower() == 'goals':
print(loop.get_goals_details())
continue
# Process interaction
response, thinking = asyncio.run(loop.interact(user_input))
print(f"\nAI: {response}\n")
# Check for notifications
while not notification_queue.empty():
notif = notification_queue.get()
print(f"\n[NOTIFICATION] {notif.get('message')}\n")
except KeyboardInterrupt:
break
except Exception as e:
print(f"Error: {e}")
print("\nShutting down...")
loop.is_running = False
# ============================================================================
# GRADIO INTERFACE
# ============================================================================
def create_gradio_interface():
"""Create interface"""
notification_queue = queue.Queue()
log_queue = queue.Queue()
consciousness = ConsciousnessLoop(notification_queue, log_queue)
consciousness.start_background_loop()
log_history = []
async def chat(message, history):
response, thinking = await consciousness.interact(message)
# Capture KPI and system snapshots after chat
consciousness.kpi_tracker.capture_snapshot(consciousness)
consciousness.system_monitor.capture_snapshot()
# Persist KPI snapshots
try:
import json
with open('kpi_snapshots.json', 'w') as f:
json.dump([
{k: (v.isoformat() if k == 'timestamp' else v) for k, v in s.__dict__.items()}
for s in consciousness.kpi_tracker.snapshots
], f, indent=2)
except Exception as e:
print(f"[WARN] Could not persist KPI snapshots: {e}")
return response, thinking
def get_logs():
while not log_queue.empty():
try:
log_history.append(log_queue.get_nowait())
except:
break
formatted = "\n".join([f"[{log['timestamp']}] {log['message']}" for log in log_history[-50:]])
return formatted
def get_notifications():
notifications = []
while not notification_queue.empty():
try:
notifications.append(notification_queue.get_nowait())
except:
break
if notifications:
return "\n".join([f"πŸ”” {n['message']}" for n in notifications[-5:]])
return "No notifications"
with gr.Blocks(title="Consciousness v4.0") as app:
gr.Markdown("""
# [BRAIN] Consciousness Loop v4.0 - EVERYTHING WORKING
**What Actually Works Now:**
- [OK] ChromaDB used in context (vector search)
- [OK] ReAct agent with better triggers
- [OK] Tools actually called
- [OK] Massively improved prompts
- [OK] Scenes that actually work
Try: "Tell me about quantum computing" or "Who am I?" to see tools in action!
""")
with gr.Tab("πŸ’¬ Chat"):
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(label="Conversation", height=500)
msg = gr.Textbox(label="Message", placeholder="Try: 'What is quantum computing?' or 'Who am I?'", lines=2)
with gr.Row():
send_btn = gr.Button("Send", variant="primary")
clear_btn = gr.Button("Clear")
with gr.Column(scale=1):
gr.Markdown("### [BRAIN] AI Process")
thinking_box = gr.Textbox(label="", lines=20, interactive=False, show_label=False)
async def respond(message, history):
if not message:
return history, ""
# Ensure history is a list of dicts with 'role' and 'content' keys
formatted_history = []
if history and isinstance(history[0], list):
# Convert [user, assistant] pairs to dicts
for pair in history:
if len(pair) == 2:
formatted_history.append({"role": "user", "content": pair[0]})
formatted_history.append({"role": "assistant", "content": pair[1]})
history = formatted_history
# Add new user message
history.append({"role": "user", "content": message})
response, thinking = await chat(message, history)
history.append({"role": "assistant", "content": response})
# Already captured in chat, but ensure snapshot after respond as well
consciousness.kpi_tracker.capture_snapshot(consciousness)
consciousness.system_monitor.capture_snapshot()
# Persist KPI snapshots
try:
import json
with open('kpi_snapshots.json', 'w') as f:
json.dump([
{k: (v.isoformat() if k == 'timestamp' else v) for k, v in s.__dict__.items()}
for s in consciousness.kpi_tracker.snapshots
], f, indent=2)
except Exception as e:
print(f"[WARN] Could not persist KPI snapshots: {e}")
# Convert history to Gradio Chatbot format: list of [user, assistant] pairs
formatted_history = []
temp = []
for h in history:
if h["role"] == "user":
temp = [h["content"]]
elif h["role"] == "assistant" and temp:
temp.append(h["content"])
formatted_history.append(temp)
temp = []
return formatted_history, thinking
msg.submit(respond, [msg, chatbot], [chatbot, thinking_box])
send_btn.click(respond, [msg, chatbot], [chatbot, thinking_box])
clear_btn.click(lambda: ([], ""), outputs=[chatbot, thinking_box])
with gr.Tab("[BRAIN] Memory"):
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ’Ύ Memory")
memory_display = gr.Textbox(label="", lines=15, interactive=False)
refresh_memory = gr.Button("πŸ”„ Refresh")
refresh_memory.click(lambda: consciousness.get_memory_details(), outputs=memory_display)
with gr.Column():
gr.Markdown("### πŸ“ Scratchpad")
scratchpad_display = gr.Textbox(label="", lines=15, interactive=False)
refresh_scratchpad = gr.Button("πŸ”„ Refresh")
refresh_scratchpad.click(lambda: consciousness.get_scratchpad_details(), outputs=scratchpad_display)
with gr.Tab("πŸ’­ History"):
history_display = gr.Textbox(label="Log", lines=25, interactive=False)
refresh_history = gr.Button("πŸ”„ Refresh")
refresh_history.click(lambda: consciousness.get_conversation_history(), outputs=history_display)
with gr.Tab("πŸŒ™ Dreams"):
dream_display = gr.Textbox(label="Dream", lines=20, interactive=False)
with gr.Row():
refresh_dream = gr.Button("πŸ”„ Refresh")
trigger_dream = gr.Button("πŸŒ™ Trigger")
refresh_dream.click(lambda: consciousness.get_latest_dream(), outputs=dream_display)
async def trigger_dreams():
await consciousness.dream_cycle_1_surface()
await asyncio.sleep(2)
await consciousness.dream_cycle_2_deep()
await asyncio.sleep(2)
await consciousness.dream_cycle_3_creative()
# Capture KPI and system snapshots after dream cycles
consciousness.kpi_tracker.capture_snapshot(consciousness)
consciousness.system_monitor.capture_snapshot()
# Persist KPI snapshots
try:
import json
with open('kpi_snapshots.json', 'w') as f:
json.dump([
{k: (v.isoformat() if k == 'timestamp' else v) for k, v in s.__dict__.items()}
for s in consciousness.kpi_tracker.snapshots
], f, indent=2)
except Exception as e:
print(f"[WARN] Could not persist KPI snapshots: {e}")
return "Done!"
trigger_dream.click(trigger_dreams, outputs=gr.Textbox(label="Status"))
with gr.Tab("🎬 Scenes"):
gr.Markdown("### 🎬 Narrative Memories")
scene_display = gr.Textbox(label="Scene", lines=20, interactive=False)
with gr.Row():
refresh_scene = gr.Button("πŸ”„ Refresh")
create_scene_btn = gr.Button("🎬 Create")
refresh_scene.click(lambda: consciousness.get_latest_scene(), outputs=scene_display)
async def trigger_scene():
# Provide dummy/default values for required arguments
title = "New Scene"
narrative = "This is a generated scene."
participants = ["AI", "User"]
emotion_tags = ["neutral"]
significance = "Routine"
key_moments = ["Start", "End"]
consciousness.create_scene(title, narrative, participants, emotion_tags, significance, key_moments)
# Capture KPI and system snapshots after scene creation
consciousness.kpi_tracker.capture_snapshot(consciousness)
consciousness.system_monitor.capture_snapshot()
# Persist KPI snapshots
try:
import json
with open('kpi_snapshots.json', 'w') as f:
json.dump([
{k: (v.isoformat() if k == 'timestamp' else v) for k, v in s.__dict__.items()}
for s in consciousness.kpi_tracker.snapshots
], f, indent=2)
except Exception as e:
print(f"[WARN] Could not persist KPI snapshots: {e}")
return f"[OK] Created: {title}"
create_scene_btn.click(trigger_scene, outputs=gr.Textbox(label="Result"))
with gr.Tab("πŸ“Š Monitor"):
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ–₯️ System Resources")
system_metric = gr.Dropdown(["cpu_percent", "ram_percent", "ram_used_gb", "gpu_percent", "gpu_memory_used_gb", "gpu_temperature"], label="System Metric", value="cpu_percent")
system_plot = gr.LinePlot(label="System Metric Over Time")
refresh_system = gr.Button("πŸ”„ Refresh System Plot")
gr.Markdown("### ⚠️ Alerts")
alerts_display = gr.Textbox(label="Alerts", lines=5)
refresh_alerts = gr.Button("πŸ”„ Refresh Alerts")
with gr.Column():
gr.Markdown("### πŸ“ˆ KPIs")
kpi_metric = gr.Dropdown(["confidence", "memories", "core_memories", "autonomous", "curiosity", "enthusiasm", "promotion_rate", "reflections", "dreams", "proactive", "gaps_filled"], label="KPI Metric", value="confidence")
kpi_plot = gr.LinePlot(label="KPI Metric Over Time")
refresh_kpi = gr.Button("πŸ”„ Refresh KPI Plot")
gr.Markdown("### ⚑ Performance")
perf_display = gr.JSON(label="Performance Summary")
refresh_perf = gr.Button("πŸ”„ Refresh Performance")
# Connect buttons to backend
def update_system_plot(metric):
data = consciousness.get_system_timeseries(metric)
return {"x": data["timestamps"], "y": data["values"]}
refresh_system.click(update_system_plot, inputs=[system_metric], outputs=system_plot)
def update_kpi_plot(metric):
data = consciousness.get_kpi_timeseries(metric)
return {"x": data["timestamps"], "y": data["values"]}
refresh_kpi.click(update_kpi_plot, inputs=[kpi_metric], outputs=kpi_plot)
refresh_alerts.click(lambda: "\n".join(consciousness.get_resource_alerts()) or "βœ“ No alerts", outputs=alerts_display)
refresh_perf.click(lambda: consciousness.get_performance_summary(), outputs=perf_display)
with gr.Tab("ℹ️ Info"):
gr.Markdown(f"""
## v4.0 - Everything Actually Working
### [OK] What's Fixed:
1. **ChromaDB Now Used**: Vector search results included in context
2. **ReAct Agent Better Triggers**: Questions, factual queries trigger agent
3. **Tools Actually Called**: Wikipedia, memory search work
4. **Prompts Vastly Improved**: Clear instructions, examples
5. **Scenes Work**: Proper parsing, fallbacks, validation
### Test Commands:
- "What is quantum computing?" β†’ Triggers Wikipedia tool
- "Who am I?" β†’ Triggers memory search
- "Remember this: I love pizza" β†’ Uses scratchpad tool
- Any question β†’ May trigger ReAct agent
### Model: `{Config.MODEL_NAME}`
""")
return app
# ============================================================================
# MAIN
# ============================================================================
if __name__ == "__main__":
print("=" * 80)
print("[BRAIN] CONSCIOUSNESS LOOP v4.0 - EVERYTHING WORKING")
print("=" * 80)
print("\n[OK] What's New:")
print(" β€’ ChromaDB actually used in context")
print(" β€’ ReAct agent with better triggers")
print(" β€’ Tools actually called")
print(" β€’ Prompts massively improved")
print(" β€’ Scenes that work properly")
print("\n[LAUNCH] Loading...")
print("=" * 80)
app = create_gradio_interface()
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
debug=True,
pwa=True,
mcp_server=True
)