|
|
""" |
|
|
Agent Memory Integration Service |
|
|
|
|
|
Integrates all agents with the Nanã memory system for persistent |
|
|
knowledge sharing and context preservation. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Any, Dict, List, Optional, Set |
|
|
from enum import Enum |
|
|
import hashlib |
|
|
|
|
|
from src.agents.nana import ( |
|
|
ContextMemoryAgent, |
|
|
EpisodicMemory, |
|
|
SemanticMemory, |
|
|
ConversationMemory, |
|
|
MemoryImportance |
|
|
) |
|
|
from src.agents.deodoro import AgentMessage, AgentContext, BaseAgent |
|
|
from src.core import get_logger |
|
|
from src.core.exceptions import MemoryError |
|
|
|
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
class MemoryIntegrationType(Enum): |
|
|
"""Types of memory integration for agents.""" |
|
|
READ_ONLY = "read_only" |
|
|
WRITE_ONLY = "write_only" |
|
|
READ_WRITE = "read_write" |
|
|
SELECTIVE = "selective" |
|
|
|
|
|
|
|
|
class AgentMemoryIntegration: |
|
|
""" |
|
|
Service to integrate agents with the Nanã memory system. |
|
|
|
|
|
This service acts as a bridge between agents and the memory system, |
|
|
providing: |
|
|
- Automatic memory storage for agent results |
|
|
- Context retrieval for informed decision making |
|
|
- Cross-agent knowledge sharing |
|
|
- Memory-based learning and improvement |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
memory_agent: ContextMemoryAgent, |
|
|
auto_store: bool = True, |
|
|
auto_retrieve: bool = True |
|
|
): |
|
|
""" |
|
|
Initialize memory integration service. |
|
|
|
|
|
Args: |
|
|
memory_agent: The Nanã memory agent instance |
|
|
auto_store: Automatically store agent results |
|
|
auto_retrieve: Automatically retrieve relevant context |
|
|
""" |
|
|
self.memory_agent = memory_agent |
|
|
self.auto_store = auto_store |
|
|
self.auto_retrieve = auto_retrieve |
|
|
|
|
|
|
|
|
self.agent_configs: Dict[str, Dict[str, Any]] = self._initialize_agent_configs() |
|
|
|
|
|
|
|
|
self.access_log: List[Dict[str, Any]] = [] |
|
|
|
|
|
|
|
|
self.memory_cache: Dict[str, Any] = {} |
|
|
self.cache_ttl = 300 |
|
|
|
|
|
def _initialize_agent_configs(self) -> Dict[str, Dict[str, Any]]: |
|
|
"""Initialize memory configurations for each agent.""" |
|
|
return { |
|
|
|
|
|
"abaporu": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["investigation", "coordination", "results"], |
|
|
"importance_threshold": MemoryImportance.LOW, |
|
|
"auto_store_results": True |
|
|
}, |
|
|
|
|
|
|
|
|
"zumbi": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["anomaly", "fraud", "investigation"], |
|
|
"importance_threshold": MemoryImportance.MEDIUM, |
|
|
"auto_store_results": True |
|
|
}, |
|
|
"anita": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["pattern", "analysis", "trend"], |
|
|
"importance_threshold": MemoryImportance.MEDIUM, |
|
|
"auto_store_results": True |
|
|
}, |
|
|
"oxossi": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["fraud", "evidence", "high_risk"], |
|
|
"importance_threshold": MemoryImportance.HIGH, |
|
|
"auto_store_results": True |
|
|
}, |
|
|
|
|
|
|
|
|
"tiradentes": { |
|
|
"integration_type": MemoryIntegrationType.READ_ONLY, |
|
|
"tags": ["report", "summary"], |
|
|
"importance_threshold": MemoryImportance.LOW, |
|
|
"auto_store_results": False |
|
|
}, |
|
|
"machado": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["document", "text_analysis", "compliance"], |
|
|
"importance_threshold": MemoryImportance.MEDIUM, |
|
|
"auto_store_results": True |
|
|
}, |
|
|
|
|
|
|
|
|
"bonifacio": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["policy", "effectiveness", "impact"], |
|
|
"importance_threshold": MemoryImportance.MEDIUM, |
|
|
"auto_store_results": True |
|
|
}, |
|
|
"dandara": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["equity", "social_justice", "inclusion"], |
|
|
"importance_threshold": MemoryImportance.MEDIUM, |
|
|
"auto_store_results": True |
|
|
}, |
|
|
"lampiao": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["regional", "geographic", "inequality"], |
|
|
"importance_threshold": MemoryImportance.MEDIUM, |
|
|
"auto_store_results": True |
|
|
}, |
|
|
|
|
|
|
|
|
"ayrton_senna": { |
|
|
"integration_type": MemoryIntegrationType.READ_ONLY, |
|
|
"tags": ["routing", "performance"], |
|
|
"importance_threshold": MemoryImportance.LOW, |
|
|
"auto_store_results": False |
|
|
}, |
|
|
"oscar_niemeyer": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["visualization", "aggregation", "metrics"], |
|
|
"importance_threshold": MemoryImportance.LOW, |
|
|
"auto_store_results": True |
|
|
}, |
|
|
"ceuci": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["prediction", "forecast", "analysis"], |
|
|
"importance_threshold": MemoryImportance.MEDIUM, |
|
|
"auto_store_results": True |
|
|
}, |
|
|
"maria_quiteria": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["security", "audit", "compliance"], |
|
|
"importance_threshold": MemoryImportance.HIGH, |
|
|
"auto_store_results": True |
|
|
}, |
|
|
"obaluaie": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["corruption", "systemic", "alert"], |
|
|
"importance_threshold": MemoryImportance.HIGH, |
|
|
"auto_store_results": True |
|
|
}, |
|
|
"drummond": { |
|
|
"integration_type": MemoryIntegrationType.READ_WRITE, |
|
|
"tags": ["communication", "message", "notification"], |
|
|
"importance_threshold": MemoryImportance.LOW, |
|
|
"auto_store_results": False |
|
|
} |
|
|
} |
|
|
|
|
|
async def integrate_agent(self, agent: BaseAgent) -> None: |
|
|
""" |
|
|
Integrate an agent with the memory system. |
|
|
|
|
|
This wraps the agent's process method to automatically handle |
|
|
memory operations. |
|
|
""" |
|
|
agent_id = agent.agent_id.lower() |
|
|
if agent_id not in self.agent_configs: |
|
|
logger.warning(f"No memory configuration for agent {agent_id}") |
|
|
return |
|
|
|
|
|
|
|
|
original_process = agent.process |
|
|
|
|
|
|
|
|
async def memory_aware_process(message: AgentMessage, context: AgentContext) -> Any: |
|
|
config = self.agent_configs[agent_id] |
|
|
|
|
|
|
|
|
if self.auto_retrieve and config["integration_type"] in [ |
|
|
MemoryIntegrationType.READ_ONLY, |
|
|
MemoryIntegrationType.READ_WRITE, |
|
|
MemoryIntegrationType.SELECTIVE |
|
|
]: |
|
|
memories = await self.retrieve_relevant_memories( |
|
|
agent_id=agent_id, |
|
|
query=message.content, |
|
|
context=context, |
|
|
tags=config["tags"] |
|
|
) |
|
|
|
|
|
|
|
|
if memories: |
|
|
context.metadata["retrieved_memories"] = memories |
|
|
logger.info(f"Retrieved {len(memories)} memories for {agent_id}") |
|
|
|
|
|
|
|
|
result = await original_process(message, context) |
|
|
|
|
|
|
|
|
if (self.auto_store and |
|
|
config["auto_store_results"] and |
|
|
config["integration_type"] in [ |
|
|
MemoryIntegrationType.WRITE_ONLY, |
|
|
MemoryIntegrationType.READ_WRITE |
|
|
] and |
|
|
result.success): |
|
|
|
|
|
|
|
|
importance = self._determine_importance(agent_id, result) |
|
|
|
|
|
if importance.value >= config["importance_threshold"].value: |
|
|
await self.store_agent_result( |
|
|
agent_id=agent_id, |
|
|
message=message, |
|
|
context=context, |
|
|
result=result, |
|
|
importance=importance, |
|
|
tags=config["tags"] |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
agent.process = memory_aware_process |
|
|
logger.info(f"Successfully integrated {agent_id} with memory system") |
|
|
|
|
|
async def retrieve_relevant_memories( |
|
|
self, |
|
|
agent_id: str, |
|
|
query: str, |
|
|
context: AgentContext, |
|
|
tags: List[str], |
|
|
limit: int = 10 |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Retrieve relevant memories for an agent.""" |
|
|
try: |
|
|
|
|
|
cache_key = self._generate_cache_key(agent_id, query, tags) |
|
|
if cache_key in self.memory_cache: |
|
|
cached = self.memory_cache[cache_key] |
|
|
if datetime.utcnow() - cached["timestamp"] < timedelta(seconds=self.cache_ttl): |
|
|
return cached["memories"] |
|
|
|
|
|
|
|
|
memories = [] |
|
|
|
|
|
|
|
|
episodic = await self.memory_agent.retrieve_episodic( |
|
|
investigation_id=context.investigation_id, |
|
|
limit=limit // 2 |
|
|
) |
|
|
memories.extend(episodic) |
|
|
|
|
|
|
|
|
for tag in tags: |
|
|
semantic = await self.memory_agent.retrieve_by_tag( |
|
|
tag=tag, |
|
|
limit=limit // len(tags) |
|
|
) |
|
|
memories.extend(semantic) |
|
|
|
|
|
|
|
|
similar = await self.memory_agent.retrieve_similar( |
|
|
query=query, |
|
|
limit=limit // 2 |
|
|
) |
|
|
memories.extend(similar) |
|
|
|
|
|
|
|
|
unique_memories = self._deduplicate_memories(memories) |
|
|
sorted_memories = sorted( |
|
|
unique_memories, |
|
|
key=lambda m: m.get("relevance", 0), |
|
|
reverse=True |
|
|
)[:limit] |
|
|
|
|
|
|
|
|
self.memory_cache[cache_key] = { |
|
|
"memories": sorted_memories, |
|
|
"timestamp": datetime.utcnow() |
|
|
} |
|
|
|
|
|
|
|
|
self.access_log.append({ |
|
|
"agent_id": agent_id, |
|
|
"timestamp": datetime.utcnow(), |
|
|
"query": query, |
|
|
"memories_retrieved": len(sorted_memories), |
|
|
"tags": tags |
|
|
}) |
|
|
|
|
|
return sorted_memories |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error retrieving memories for {agent_id}: {str(e)}") |
|
|
return [] |
|
|
|
|
|
async def store_agent_result( |
|
|
self, |
|
|
agent_id: str, |
|
|
message: AgentMessage, |
|
|
context: AgentContext, |
|
|
result: Any, |
|
|
importance: MemoryImportance, |
|
|
tags: List[str] |
|
|
) -> bool: |
|
|
"""Store agent result in memory.""" |
|
|
try: |
|
|
|
|
|
memory_id = f"{agent_id}_{context.investigation_id}_{datetime.utcnow().timestamp()}" |
|
|
|
|
|
episodic_memory = EpisodicMemory( |
|
|
id=memory_id, |
|
|
content={ |
|
|
"agent": agent_id, |
|
|
"message": message.content, |
|
|
"result": result.data if hasattr(result, 'data') else str(result) |
|
|
}, |
|
|
importance=importance, |
|
|
tags=tags + [agent_id], |
|
|
investigation_id=context.investigation_id, |
|
|
user_id=context.user_id, |
|
|
session_id=context.session_id, |
|
|
query=message.content, |
|
|
result=result.data if hasattr(result, 'data') else {"result": str(result)}, |
|
|
context=context.metadata |
|
|
) |
|
|
|
|
|
|
|
|
await self.memory_agent.store_episodic( |
|
|
memory=episodic_memory, |
|
|
context=context |
|
|
) |
|
|
|
|
|
|
|
|
if agent_id in ["zumbi", "anita", "oxossi", "bonifacio"]: |
|
|
await self._extract_semantic_knowledge( |
|
|
agent_id=agent_id, |
|
|
result=result, |
|
|
tags=tags, |
|
|
context=context |
|
|
) |
|
|
|
|
|
logger.info(f"Stored result from {agent_id} with importance {importance.value}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error storing result from {agent_id}: {str(e)}") |
|
|
return False |
|
|
|
|
|
async def _extract_semantic_knowledge( |
|
|
self, |
|
|
agent_id: str, |
|
|
result: Any, |
|
|
tags: List[str], |
|
|
context: AgentContext |
|
|
) -> None: |
|
|
"""Extract semantic knowledge from agent results.""" |
|
|
try: |
|
|
knowledge_items = [] |
|
|
|
|
|
|
|
|
if agent_id == "anita" and hasattr(result, 'data'): |
|
|
patterns = result.data.get("patterns", []) |
|
|
for pattern in patterns: |
|
|
knowledge_items.append({ |
|
|
"concept": f"pattern_{pattern.get('type', 'unknown')}", |
|
|
"description": pattern.get("description", ""), |
|
|
"confidence": pattern.get("confidence", 0.5), |
|
|
"evidence": [pattern.get("evidence", "")] |
|
|
}) |
|
|
|
|
|
|
|
|
elif agent_id == "oxossi" and hasattr(result, 'data'): |
|
|
fraud_analysis = result.data.get("fraud_analysis", {}) |
|
|
patterns = fraud_analysis.get("patterns", []) |
|
|
for pattern in patterns: |
|
|
knowledge_items.append({ |
|
|
"concept": f"fraud_{pattern.get('fraud_type', 'unknown')}", |
|
|
"description": f"{pattern.get('fraud_type', 'Unknown')} fraud pattern detected", |
|
|
"confidence": pattern.get("confidence", 0.5), |
|
|
"evidence": [str(ind) for ind in pattern.get("indicators", [])] |
|
|
}) |
|
|
|
|
|
|
|
|
elif agent_id == "zumbi" and hasattr(result, 'data'): |
|
|
anomalies = result.data.get("anomalies", []) |
|
|
for anomaly in anomalies: |
|
|
knowledge_items.append({ |
|
|
"concept": f"anomaly_{anomaly.get('type', 'unknown')}", |
|
|
"description": anomaly.get("description", ""), |
|
|
"confidence": anomaly.get("confidence", 0.5), |
|
|
"evidence": [anomaly.get("evidence", "")] |
|
|
}) |
|
|
|
|
|
|
|
|
for item in knowledge_items: |
|
|
semantic_memory = SemanticMemory( |
|
|
id=f"semantic_{agent_id}_{item['concept']}_{datetime.utcnow().timestamp()}", |
|
|
content=item, |
|
|
concept=item["concept"], |
|
|
relationships=[agent_id] + tags, |
|
|
evidence=item["evidence"], |
|
|
confidence=item["confidence"], |
|
|
importance=MemoryImportance.MEDIUM, |
|
|
tags=tags + [agent_id, "knowledge"] |
|
|
) |
|
|
|
|
|
await self.memory_agent.store_semantic( |
|
|
memory=semantic_memory, |
|
|
context=context |
|
|
) |
|
|
|
|
|
if knowledge_items: |
|
|
logger.info(f"Extracted {len(knowledge_items)} semantic knowledge items from {agent_id}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error extracting semantic knowledge from {agent_id}: {str(e)}") |
|
|
|
|
|
def _determine_importance(self, agent_id: str, result: Any) -> MemoryImportance: |
|
|
"""Determine the importance of a result for memory storage.""" |
|
|
|
|
|
if agent_id in ["oxossi", "maria_quiteria", "obaluaie"]: |
|
|
if hasattr(result, 'data'): |
|
|
|
|
|
if "risk_level" in result.data and result.data["risk_level"] in ["HIGH", "CRITICAL"]: |
|
|
return MemoryImportance.HIGH |
|
|
if "severity" in result.data and result.data["severity"] in ["high", "critical"]: |
|
|
return MemoryImportance.HIGH |
|
|
|
|
|
|
|
|
if agent_id in ["zumbi", "anita", "bonifacio", "dandara"]: |
|
|
if hasattr(result, 'data'): |
|
|
|
|
|
if result.data.get("anomalies", []) or result.data.get("patterns", []): |
|
|
return MemoryImportance.MEDIUM |
|
|
|
|
|
|
|
|
return MemoryImportance.LOW |
|
|
|
|
|
def _generate_cache_key(self, agent_id: str, query: str, tags: List[str]) -> str: |
|
|
"""Generate cache key for memory retrieval.""" |
|
|
components = [agent_id, query] + sorted(tags) |
|
|
return hashlib.md5(":".join(components).encode()).hexdigest() |
|
|
|
|
|
def _deduplicate_memories(self, memories: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
|
"""Remove duplicate memories based on content hash.""" |
|
|
seen = set() |
|
|
unique = [] |
|
|
|
|
|
for memory in memories: |
|
|
|
|
|
content_str = json.dumps(memory.get("content", {}), sort_keys=True) |
|
|
content_hash = hashlib.md5(content_str.encode()).hexdigest() |
|
|
|
|
|
if content_hash not in seen: |
|
|
seen.add(content_hash) |
|
|
unique.append(memory) |
|
|
|
|
|
return unique |
|
|
|
|
|
async def share_knowledge_between_agents( |
|
|
self, |
|
|
source_agent: str, |
|
|
target_agent: str, |
|
|
knowledge_type: str, |
|
|
filters: Optional[Dict[str, Any]] = None |
|
|
) -> bool: |
|
|
""" |
|
|
Share specific knowledge from one agent to another. |
|
|
|
|
|
This enables cross-agent learning and collaboration. |
|
|
""" |
|
|
try: |
|
|
source_config = self.agent_configs.get(source_agent) |
|
|
target_config = self.agent_configs.get(target_agent) |
|
|
|
|
|
if not source_config or not target_config: |
|
|
logger.error(f"Invalid agent IDs: {source_agent} or {target_agent}") |
|
|
return False |
|
|
|
|
|
|
|
|
if source_config["integration_type"] not in [ |
|
|
MemoryIntegrationType.READ_WRITE, |
|
|
MemoryIntegrationType.READ_ONLY |
|
|
]: |
|
|
logger.error(f"{source_agent} cannot share knowledge (write-only)") |
|
|
return False |
|
|
|
|
|
|
|
|
source_memories = await self.memory_agent.retrieve_by_tag( |
|
|
tag=source_agent, |
|
|
limit=100 |
|
|
) |
|
|
|
|
|
|
|
|
filtered_memories = [ |
|
|
m for m in source_memories |
|
|
if knowledge_type in m.get("tags", []) |
|
|
] |
|
|
|
|
|
|
|
|
if filters: |
|
|
for key, value in filters.items(): |
|
|
filtered_memories = [ |
|
|
m for m in filtered_memories |
|
|
if m.get(key) == value |
|
|
] |
|
|
|
|
|
|
|
|
for memory in filtered_memories: |
|
|
memory["tags"] = list(set(memory.get("tags", []) + [target_agent, "shared"])) |
|
|
|
|
|
logger.info( |
|
|
f"Shared {len(filtered_memories)} {knowledge_type} memories " |
|
|
f"from {source_agent} to {target_agent}" |
|
|
) |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error sharing knowledge: {str(e)}") |
|
|
return False |
|
|
|
|
|
async def get_memory_statistics(self) -> Dict[str, Any]: |
|
|
"""Get statistics about memory usage by agents.""" |
|
|
stats = { |
|
|
"total_accesses": len(self.access_log), |
|
|
"cache_size": len(self.memory_cache), |
|
|
"by_agent": {} |
|
|
} |
|
|
|
|
|
|
|
|
for log_entry in self.access_log: |
|
|
agent_id = log_entry["agent_id"] |
|
|
if agent_id not in stats["by_agent"]: |
|
|
stats["by_agent"][agent_id] = { |
|
|
"accesses": 0, |
|
|
"memories_retrieved": 0, |
|
|
"last_access": None |
|
|
} |
|
|
|
|
|
stats["by_agent"][agent_id]["accesses"] += 1 |
|
|
stats["by_agent"][agent_id]["memories_retrieved"] += log_entry["memories_retrieved"] |
|
|
stats["by_agent"][agent_id]["last_access"] = log_entry["timestamp"] |
|
|
|
|
|
return stats |
|
|
|
|
|
async def optimize_memory_for_agent(self, agent_id: str) -> None: |
|
|
""" |
|
|
Optimize memory storage for a specific agent. |
|
|
|
|
|
This consolidates related memories and removes outdated ones. |
|
|
""" |
|
|
try: |
|
|
config = self.agent_configs.get(agent_id) |
|
|
if not config: |
|
|
return |
|
|
|
|
|
|
|
|
agent_memories = await self.memory_agent.retrieve_by_tag( |
|
|
tag=agent_id, |
|
|
limit=1000 |
|
|
) |
|
|
|
|
|
|
|
|
memory_groups = {} |
|
|
for memory in agent_memories: |
|
|
key = memory.get("concept", memory.get("id", "unknown")) |
|
|
if key not in memory_groups: |
|
|
memory_groups[key] = [] |
|
|
memory_groups[key].append(memory) |
|
|
|
|
|
|
|
|
for key, memories in memory_groups.items(): |
|
|
if len(memories) > 5: |
|
|
|
|
|
consolidated = await self._consolidate_memories(memories) |
|
|
|
|
|
|
|
|
await self.memory_agent.store_semantic( |
|
|
memory=consolidated, |
|
|
context=AgentContext( |
|
|
investigation_id=f"consolidation_{agent_id}", |
|
|
user_id="system", |
|
|
session_id="optimization" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
for memory in memories[:-1]: |
|
|
memory["tags"].append("consolidated") |
|
|
|
|
|
logger.info(f"Optimized memory for {agent_id}: {len(memory_groups)} concepts") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error optimizing memory for {agent_id}: {str(e)}") |
|
|
|
|
|
async def _consolidate_memories(self, memories: List[Dict[str, Any]]) -> SemanticMemory: |
|
|
"""Consolidate multiple memories into a single semantic memory.""" |
|
|
|
|
|
concepts = [m.get("concept", "") for m in memories if m.get("concept")] |
|
|
concept = max(set(concepts), key=concepts.count) if concepts else "consolidated" |
|
|
|
|
|
|
|
|
all_evidence = [] |
|
|
for memory in memories: |
|
|
evidence = memory.get("evidence", []) |
|
|
if isinstance(evidence, list): |
|
|
all_evidence.extend(evidence) |
|
|
|
|
|
|
|
|
confidences = [m.get("confidence", 0.5) for m in memories if "confidence" in m] |
|
|
avg_confidence = sum(confidences) / len(confidences) if confidences else 0.5 |
|
|
|
|
|
|
|
|
all_tags = [] |
|
|
for memory in memories: |
|
|
tags = memory.get("tags", []) |
|
|
if isinstance(tags, list): |
|
|
all_tags.extend(tags) |
|
|
|
|
|
return SemanticMemory( |
|
|
id=f"consolidated_{concept}_{datetime.utcnow().timestamp()}", |
|
|
content={ |
|
|
"consolidated_from": len(memories), |
|
|
"original_ids": [m.get("id") for m in memories], |
|
|
"concept": concept |
|
|
}, |
|
|
concept=concept, |
|
|
relationships=list(set(all_tags)), |
|
|
evidence=list(set(all_evidence))[:10], |
|
|
confidence=avg_confidence, |
|
|
importance=MemoryImportance.MEDIUM, |
|
|
tags=list(set(all_tags)) + ["consolidated"] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
memory_integration = None |
|
|
|
|
|
|
|
|
async def initialize_memory_integration(memory_agent: ContextMemoryAgent) -> AgentMemoryIntegration: |
|
|
"""Initialize the global memory integration service.""" |
|
|
global memory_integration |
|
|
memory_integration = AgentMemoryIntegration(memory_agent) |
|
|
logger.info("Memory integration service initialized") |
|
|
return memory_integration |
|
|
|
|
|
|
|
|
def get_memory_integration() -> Optional[AgentMemoryIntegration]: |
|
|
"""Get the global memory integration instance.""" |
|
|
return memory_integration |