neural-thinker's picture
feat: clean HuggingFace deployment with essential files only
824bf31

๐Ÿ—๏ธ Cidadรฃo.AI Infrastructure Layer

๐Ÿ“‹ Overview

The Infrastructure Layer provides enterprise-grade distributed persistence, caching, and system orchestration for the Cidadรฃo.AI platform. Built with PostgreSQL, Redis Cluster, and advanced caching strategies to support high-performance, scalable transparency analysis.

๐Ÿ—๏ธ Architecture

src/infrastructure/
โ”œโ”€โ”€ database.py          # Distributed persistence manager
โ”œโ”€โ”€ cache_system.py      # Multi-layer caching system
โ”œโ”€โ”€ monitoring.py        # System health & metrics
โ”œโ”€โ”€ orchestrator.py      # Agent orchestration
โ””โ”€โ”€ agent_pool.py        # Agent pool management

๐Ÿ’พ Database Architecture (database.py)

Enterprise Distributed Persistence System

The database system implements a sophisticated multi-layer architecture designed for:

  • High Availability: PostgreSQL with connection pooling
  • Distributed Caching: Redis Cluster with intelligent fallback
  • Performance: Multi-layer cache with configurable TTLs
  • Reliability: Automatic retry mechanisms and circuit breakers

Core Components

1. DatabaseManager - Central Persistence Controller

class DatabaseManager:
    """
    Advanced database manager with distributed persistence
    
    Features:
    - PostgreSQL async connection pooling
    - Redis Cluster with automatic failover
    - Multi-layer caching (memory + distributed)
    - Performance metrics and monitoring
    - Automatic retry and circuit breaking
    - Health checks and diagnostics
    """
    
    def __init__(self, config: DatabaseConfig):
        self.pg_engine = None              # PostgreSQL async engine
        self.redis_cluster = None          # Redis Cluster client
        self.session_factory = None        # SQLAlchemy session factory
        self.metrics = {                   # Performance tracking
            "queries_executed": 0,
            "cache_hits": 0,
            "cache_misses": 0,
            "avg_query_time": 0.0
        }

2. DatabaseConfig - Configuration Management

class DatabaseConfig(BaseModel):
    """Comprehensive database configuration"""
    
    # PostgreSQL Configuration
    postgres_url: str = "postgresql+asyncpg://user:pass@localhost:5432/cidadao_ai"
    postgres_pool_size: int = 20           # Connection pool size
    postgres_max_overflow: int = 30        # Additional connections allowed
    postgres_pool_timeout: int = 30        # Connection timeout (seconds)
    
    # Redis Cluster Configuration
    redis_nodes: List[Dict[str, Union[str, int]]] = [
        {"host": "localhost", "port": 7000},
        {"host": "localhost", "port": 7001},
        {"host": "localhost", "port": 7002}
    ]
    redis_password: Optional[str] = None
    redis_decode_responses: bool = True
    
    # Cache TTL Strategies
    cache_ttl_short: int = 300             # 5 minutes - Frequently changing data
    cache_ttl_medium: int = 3600           # 1 hour - Moderately stable data
    cache_ttl_long: int = 86400            # 24 hours - Stable reference data
    
    # Performance Tuning
    connection_retry_attempts: int = 3
    connection_retry_delay: float = 1.0
    query_timeout: int = 30

Data Models

Investigation - Core Investigation Entity

class Investigation(BaseModel):
    """Primary data model for transparency investigations"""
    
    # Identity & Ownership
    id: str                                # Unique investigation ID (UUID)
    user_id: Optional[str] = None          # User who initiated
    
    # Investigation Details
    query: str                             # Original query/request
    status: str = "pending"                # Current status
    results: Optional[Dict[str, Any]] = None  # Analysis results
    metadata: Dict[str, Any] = field(default_factory=dict)  # Additional context
    
    # Timestamps
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)
    completed_at: Optional[datetime] = None
    
    # Analysis Results
    error_message: Optional[str] = None    # Error details if failed
    confidence_score: Optional[float] = None  # Result confidence (0-1)
    anomalies_found: int = 0               # Number of anomalies detected
    processing_time_ms: Optional[int] = None  # Processing duration

Investigation Status Lifecycle:

pending โ†’ processing โ†’ completed
                   โ†“
                 error

Database Tables

Investigations Table

CREATE TABLE investigations (
    id VARCHAR(50) PRIMARY KEY,                    -- Investigation UUID
    user_id VARCHAR(50),                           -- User identifier
    query TEXT NOT NULL,                           -- Investigation query
    status VARCHAR(20) NOT NULL DEFAULT 'pending', -- Current status
    results JSON,                                  -- Analysis results (JSONB)
    metadata JSON,                                 -- Investigation metadata
    created_at TIMESTAMP NOT NULL,                 -- Creation timestamp
    updated_at TIMESTAMP NOT NULL,                 -- Last update
    completed_at TIMESTAMP,                        -- Completion timestamp
    error_message TEXT,                            -- Error details
    confidence_score FLOAT,                        -- Result confidence
    anomalies_found INTEGER DEFAULT 0,             -- Anomaly count
    processing_time_ms INTEGER                     -- Processing duration
);

-- Indexes for performance
CREATE INDEX idx_investigations_user_id ON investigations(user_id);
CREATE INDEX idx_investigations_status ON investigations(status);
CREATE INDEX idx_investigations_created_at ON investigations(created_at);
CREATE INDEX idx_investigations_confidence ON investigations(confidence_score);

Audit Logs Table

CREATE TABLE audit_logs (
    id VARCHAR(50) PRIMARY KEY,           -- Audit event UUID
    investigation_id VARCHAR(50),         -- Related investigation
    agent_name VARCHAR(100) NOT NULL,     -- Agent that performed action
    action VARCHAR(100) NOT NULL,         -- Action performed
    timestamp TIMESTAMP NOT NULL,         -- When action occurred
    data JSON,                            -- Action details
    hash_chain VARCHAR(64)                -- Cryptographic hash chain
);

-- Indexes for audit queries
CREATE INDEX idx_audit_investigation ON audit_logs(investigation_id);
CREATE INDEX idx_audit_agent ON audit_logs(agent_name);
CREATE INDEX idx_audit_timestamp ON audit_logs(timestamp);

Metrics Table

CREATE TABLE metrics (
    id VARCHAR(50) PRIMARY KEY,           -- Metric event UUID
    metric_name VARCHAR(100) NOT NULL,    -- Metric identifier
    metric_value FLOAT NOT NULL,          -- Metric value
    tags JSON,                            -- Metric tags/dimensions
    timestamp TIMESTAMP NOT NULL         -- Measurement timestamp
);

๐Ÿš€ Advanced Features

1. Distributed Caching Strategy

Multi-Layer Cache Architecture

class CacheLayer(Enum):
    MEMORY = "memory"        # In-process cache (fastest, smallest)
    REDIS = "redis"          # Distributed cache (fast, shared)
    PERSISTENT = "db"        # Database cache (slow, permanent)

# Cache hierarchy with automatic fallback
async def get_cached_data(key: str) -> Optional[Any]:
    """Intelligent cache retrieval with layer fallback"""
    
    # 1. Try memory cache first (microseconds)
    result = await memory_cache.get(key)
    if result:
        return result
    
    # 2. Try Redis cache (milliseconds)
    result = await redis_cache.get(key)
    if result:
        # Populate memory cache for next time
        await memory_cache.set(key, result, ttl=300)
        return result
    
    # 3. Cache miss - fetch from database
    return None

TTL Strategy by Data Type

# Strategic cache TTL based on data volatility
CACHE_STRATEGIES = {
    "investigation_results": {
        "ttl": 3600,                    # 1 hour - stable after completion
        "layer": CacheLayer.REDIS
    },
    "api_responses": {
        "ttl": 1800,                    # 30 minutes - external API data
        "layer": CacheLayer.REDIS
    },
    "user_sessions": {
        "ttl": 300,                     # 5 minutes - frequently updated
        "layer": CacheLayer.MEMORY
    },
    "reference_data": {
        "ttl": 86400,                   # 24 hours - static data
        "layer": CacheLayer.REDIS
    }
}

2. Connection Management

PostgreSQL Connection Pooling

# Advanced connection pool configuration  
engine = create_async_engine(
    database_url,
    pool_size=20,                       # Base connection pool
    max_overflow=30,                    # Additional connections under load
    pool_timeout=30,                    # Wait time for connection
    pool_recycle=3600,                  # Recycle connections hourly
    pool_pre_ping=True,                 # Validate connections
    echo=False                          # SQL logging (disable in production)
)

# Session management with automatic cleanup
@asynccontextmanager
async def get_session():
    """Database session with automatic transaction management"""
    
    async with session_factory() as session:
        try:
            yield session
            await session.commit()          # Auto-commit on success
        except Exception:
            await session.rollback()        # Auto-rollback on error
            raise
        finally:
            await session.close()           # Always cleanup

Redis Cluster with Failover

async def _init_redis_cluster(self):
    """Initialize Redis with cluster failover"""
    
    try:
        # Primary: Redis Cluster for high availability
        self.redis_cluster = RedisCluster(
            startup_nodes=self.config.redis_nodes,
            password=self.config.redis_password,
            decode_responses=True,
            skip_full_coverage_check=True,  # Allow partial clusters
            health_check_interval=30        # Regular health checks
        )
        
        await self.redis_cluster.ping()
        logger.info("โœ… Redis Cluster connected")
        
    except Exception as e:
        logger.warning(f"โš ๏ธ Cluster failed, using single Redis: {e}")
        
        # Fallback: Single Redis node
        node = self.config.redis_nodes[0]
        self.redis_cluster = redis.Redis(
            host=node["host"],
            port=node["port"],
            password=self.config.redis_password,
            decode_responses=True
        )
        
        await self.redis_cluster.ping()
        logger.info("โœ… Redis fallback connected")

3. High-Performance Operations

Bulk Investigation Saving with UPSERT

async def save_investigation(self, investigation: Investigation) -> bool:
    """
    High-performance investigation storage with UPSERT
    
    Features:
    - PostgreSQL UPSERT (INSERT ... ON CONFLICT)
    - Automatic Redis cache population
    - Performance metrics tracking
    - Error handling with rollback
    """
    
    try:
        async with self.get_session() as session:
            # UPSERT query for PostgreSQL
            query = """
            INSERT INTO investigations 
            (id, user_id, query, status, results, metadata, created_at, updated_at, 
             completed_at, error_message, confidence_score, anomalies_found, processing_time_ms)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
            ON CONFLICT (id) DO UPDATE SET
                status = EXCLUDED.status,
                results = EXCLUDED.results,
                updated_at = EXCLUDED.updated_at,
                completed_at = EXCLUDED.completed_at,
                error_message = EXCLUDED.error_message,
                confidence_score = EXCLUDED.confidence_score,
                anomalies_found = EXCLUDED.anomalies_found,
                processing_time_ms = EXCLUDED.processing_time_ms
            """
            
            await session.execute(query, [
                investigation.id,
                investigation.user_id,
                investigation.query,
                investigation.status,
                json.dumps(investigation.results) if investigation.results else None,
                json.dumps(investigation.metadata),
                investigation.created_at,
                investigation.updated_at,
                investigation.completed_at,
                investigation.error_message,
                investigation.confidence_score,
                investigation.anomalies_found,
                investigation.processing_time_ms
            ])
        
        # Cache in Redis for fast retrieval
        cache_key = f"investigation:{investigation.id}"
        await self.redis_cluster.setex(
            cache_key,
            self.config.cache_ttl_medium,  # 1 hour TTL
            investigation.model_dump_json()
        )
        
        logger.info(f"โœ… Investigation {investigation.id} saved")
        return True
        
    except Exception as e:
        logger.error(f"โŒ Error saving investigation {investigation.id}: {e}")
        return False

Intelligent Cache Retrieval

async def get_investigation(self, investigation_id: str) -> Optional[Investigation]:
    """
    Multi-layer investigation retrieval with cache population
    
    Strategy:
    1. Check Redis cache first (fast)
    2. If cache miss, query PostgreSQL
    3. Populate cache with result
    4. Track cache hit/miss metrics
    """
    
    cache_key = f"investigation:{investigation_id}"
    
    # Try cache first
    try:
        cached = await self.redis_cluster.get(cache_key)
        if cached:
            self.metrics["cache_hits"] += 1
            return Investigation.model_validate_json(cached)
    except Exception:
        pass  # Cache error, continue to database
    
    # Cache miss - query database
    self.metrics["cache_misses"] += 1
    
    try:
        async with self.get_session() as session:
            query = "SELECT * FROM investigations WHERE id = $1"
            result = await session.execute(query, [investigation_id])
            row = result.fetchone()
            
            if row:
                investigation = Investigation(
                    id=row["id"],
                    user_id=row["user_id"],
                    query=row["query"],
                    status=row["status"],
                    results=json.loads(row["results"]) if row["results"] else None,
                    metadata=json.loads(row["metadata"]) if row["metadata"] else {},
                    created_at=row["created_at"],
                    updated_at=row["updated_at"],
                    completed_at=row["completed_at"],
                    error_message=row["error_message"],
                    confidence_score=row["confidence_score"],
                    anomalies_found=row["anomalies_found"],
                    processing_time_ms=row["processing_time_ms"]
                )
                
                # Populate cache for future requests
                await self.redis_cluster.setex(
                    cache_key,
                    self.config.cache_ttl_medium,
                    investigation.model_dump_json()
                )
                
                return investigation
                
    except Exception as e:
        logger.error(f"โŒ Error retrieving investigation {investigation_id}: {e}")
    
    return None

4. Generic Cache Operations

async def cache_set(
    self, 
    key: str, 
    value: Any, 
    ttl: int = None, 
    layer: CacheLayer = CacheLayer.REDIS
) -> bool:
    """Generic cache storage with layer selection"""
    
    try:
        if layer == CacheLayer.REDIS:
            ttl = ttl or self.config.cache_ttl_medium
            
            # Serialize complex objects
            if isinstance(value, (dict, list)):
                value = json.dumps(value)
            elif isinstance(value, BaseModel):
                value = value.model_dump_json()
                
            await self.redis_cluster.setex(key, ttl, value)
            return True
            
    except Exception as e:
        logger.error(f"โŒ Cache set error for {key}: {e}")
        return False

async def cache_get(self, key: str, layer: CacheLayer = CacheLayer.REDIS) -> Optional[Any]:
    """Generic cache retrieval with automatic deserialization"""
    
    try:
        if layer == CacheLayer.REDIS:
            result = await self.redis_cluster.get(key)
            if result:
                self.metrics["cache_hits"] += 1
                
                # Try to deserialize JSON
                try:
                    return json.loads(result)
                except json.JSONDecodeError:
                    return result  # Return raw string if not JSON
            else:
                self.metrics["cache_misses"] += 1
                
    except Exception as e:
        logger.error(f"โŒ Cache get error for {key}: {e}")
    
    return None

๐Ÿ“Š System Health & Monitoring

Comprehensive Health Checks

async def get_health_status(self) -> Dict[str, Any]:
    """Complete system health assessment"""
    
    status = {
        "postgresql": {"status": "unknown", "latency_ms": None},
        "redis": {"status": "unknown", "latency_ms": None},
        "cache_metrics": self.metrics,
        "timestamp": datetime.utcnow().isoformat()
    }
    
    # PostgreSQL Health Check
    try:
        start_time = asyncio.get_event_loop().time()
        async with self.get_session() as session:
            await session.execute("SELECT 1")
        pg_latency = (asyncio.get_event_loop().time() - start_time) * 1000
        
        status["postgresql"] = {
            "status": "healthy",
            "latency_ms": round(pg_latency, 2),
            "pool_size": self.pg_engine.pool.size(),
            "pool_checked_in": self.pg_engine.pool.checkedin(),
            "pool_checked_out": self.pg_engine.pool.checkedout()
        }
    except Exception as e:
        status["postgresql"] = {"status": "unhealthy", "error": str(e)}
    
    # Redis Health Check
    try:
        start_time = asyncio.get_event_loop().time()
        await self.redis_cluster.ping()
        redis_latency = (asyncio.get_event_loop().time() - start_time) * 1000
        
        # Get Redis info
        info = await self.redis_cluster.info()
        
        status["redis"] = {
            "status": "healthy",
            "latency_ms": round(redis_latency, 2),
            "connected_clients": info.get("connected_clients", 0),
            "used_memory": info.get("used_memory_human", "unknown"),
            "uptime": info.get("uptime_in_seconds", 0)
        }
    except Exception as e:
        status["redis"] = {"status": "unhealthy", "error": str(e)}
    
    return status

Performance Metrics

# Real-time performance tracking
class PerformanceMetrics:
    def __init__(self):
        self.metrics = {
            "queries_executed": 0,          # Total database queries
            "cache_hits": 0,                # Cache hit count
            "cache_misses": 0,              # Cache miss count
            "avg_query_time": 0.0,          # Average query time (ms)
            "total_investigations": 0,       # Total investigations processed
            "active_connections": 0,         # Current DB connections
            "error_rate": 0.0               # Error percentage
        }
    
    def calculate_cache_hit_rate(self) -> float:
        """Calculate cache hit rate percentage"""
        total = self.metrics["cache_hits"] + self.metrics["cache_misses"]
        if total == 0:
            return 0.0
        return (self.metrics["cache_hits"] / total) * 100
    
    def update_avg_query_time(self, new_time: float):
        """Update rolling average query time"""
        current_avg = self.metrics["avg_query_time"]
        queries = self.metrics["queries_executed"]
        
        self.metrics["avg_query_time"] = (
            (current_avg * queries + new_time) / (queries + 1)
        )
        self.metrics["queries_executed"] += 1

๐Ÿš€ Usage Examples

Basic Database Operations

from src.infrastructure.database import get_database_manager, Investigation

async def main():
    # Get database manager (singleton pattern)
    db = await get_database_manager()
    
    # Create investigation
    investigation = Investigation(
        id="inv_001",
        user_id="user_123",
        query="Analyze Ministry of Health contracts 2024",
        status="pending",
        metadata={"priority": "high", "data_source": "contracts"}
    )
    
    # Save to database (with automatic caching)
    success = await db.save_investigation(investigation)
    print(f"Investigation saved: {success}")
    
    # Retrieve (automatic cache usage)
    retrieved = await db.get_investigation("inv_001")
    print(f"Retrieved: {retrieved.query}")
    
    # Generic caching
    await db.cache_set("analysis_results", {"anomalies": 5}, ttl=3600)
    results = await db.cache_get("analysis_results")
    print(f"Cached results: {results}")
    
    # Health check
    health = await db.get_health_status()
    print(f"System health: {health}")

Advanced Usage Patterns

# Batch processing with connection management
async def process_investigations_batch(investigations: List[Investigation]):
    """Process multiple investigations efficiently"""
    
    db = await get_database_manager()
    
    # Process in parallel with connection pooling
    save_tasks = [
        db.save_investigation(inv) 
        for inv in investigations
    ]
    
    results = await asyncio.gather(*save_tasks, return_exceptions=True)
    
    success_count = sum(1 for r in results if r is True)
    print(f"Saved {success_count}/{len(investigations)} investigations")

# Smart caching for expensive operations
async def get_or_compute_analysis(analysis_id: str):
    """Get analysis from cache or compute if needed"""
    
    db = await get_database_manager()
    cache_key = f"analysis:{analysis_id}"
    
    # Try cache first
    cached_result = await db.cache_get(cache_key)
    if cached_result:
        return cached_result
    
    # Compute expensive analysis
    result = await perform_expensive_analysis(analysis_id)
    
    # Cache for 1 hour
    await db.cache_set(cache_key, result, ttl=3600)
    
    return result

๐Ÿ”ง Configuration & Deployment

Environment Configuration

# PostgreSQL Configuration
DATABASE_URL=postgresql+asyncpg://cidadao:password@localhost:5432/cidadao_ai
DATABASE_POOL_SIZE=20
DATABASE_MAX_OVERFLOW=30
DATABASE_POOL_TIMEOUT=30

# Redis Cluster Configuration
REDIS_NODES=localhost:7000,localhost:7001,localhost:7002
REDIS_PASSWORD=redis_password
REDIS_DECODE_RESPONSES=true

# Cache TTL Configuration
CACHE_TTL_SHORT=300
CACHE_TTL_MEDIUM=3600
CACHE_TTL_LONG=86400

# Performance Tuning
CONNECTION_RETRY_ATTEMPTS=3
CONNECTION_RETRY_DELAY=1.0
QUERY_TIMEOUT=30

Docker Deployment

# docker-compose.yml for infrastructure services
version: '3.8'
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: cidadao_ai
      POSTGRES_USER: cidadao
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    command: |
      postgres -c max_connections=100
               -c shared_buffers=256MB
               -c effective_cache_size=1GB
               -c work_mem=4MB
  
  redis-node-1:
    image: redis:7
    ports:
      - "7000:7000"
    command: |
      redis-server --port 7000
                   --cluster-enabled yes
                   --cluster-config-file nodes.conf
                   --cluster-node-timeout 5000
                   --appendonly yes
  
  redis-node-2:
    image: redis:7
    ports:
      - "7001:7001"
    command: |
      redis-server --port 7001
                   --cluster-enabled yes
                   --cluster-config-file nodes.conf
                   --cluster-node-timeout 5000
                   --appendonly yes
  
  redis-node-3:
    image: redis:7
    ports:
      - "7002:7002"
    command: |
      redis-server --port 7002
                   --cluster-enabled yes
                   --cluster-config-file nodes.conf
                   --cluster-node-timeout 5000
                   --appendonly yes

volumes:
  postgres_data:

Performance Tuning

# Production-optimized configuration
PRODUCTION_CONFIG = DatabaseConfig(
    # PostgreSQL optimizations
    postgres_pool_size=50,                 # Higher connection pool
    postgres_max_overflow=50,              # More overflow connections
    postgres_pool_timeout=60,              # Longer timeout
    
    # Cache optimizations
    cache_ttl_short=600,                   # 10 minutes
    cache_ttl_medium=7200,                 # 2 hours
    cache_ttl_long=172800,                 # 48 hours
    
    # Retry configuration
    connection_retry_attempts=5,
    connection_retry_delay=2.0,
    query_timeout=60
)

๐Ÿงช Testing Infrastructure

# Test database setup with TestContainers
import pytest
from testcontainers.postgres import PostgresContainer
from testcontainers.redis import RedisContainer

@pytest.fixture
async def test_database():
    """Test database with real PostgreSQL"""
    
    with PostgresContainer("postgres:16") as postgres:
        config = DatabaseConfig(
            postgres_url=postgres.get_connection_url().replace(
                "postgresql://", "postgresql+asyncpg://"
            )
        )
        
        db = DatabaseManager(config)
        await db.initialize()
        
        yield db
        
        await db.cleanup()

@pytest.fixture
async def test_redis():
    """Test Redis with real Redis container"""
    
    with RedisContainer() as redis:
        config = DatabaseConfig(
            redis_nodes=[{
                "host": redis.get_container_host_ip(),
                "port": redis.get_exposed_port(6379)
            }]
        )
        
        db = DatabaseManager(config)
        await db._init_redis_cluster()
        
        yield db.redis_cluster
        
        await db.redis_cluster.close()

This infrastructure layer provides enterprise-grade persistence with intelligent caching, high availability, and comprehensive monitoring - essential for the demanding requirements of transparency analysis at scale.