anderson-ufrj commited on
Commit
e29e8bd
·
1 Parent(s): eccaf5b

feat(performance): implement cache warming and database optimization

Browse files

- Create cache warming service with multiple strategies
- Add query tracking middleware for pattern analysis
- Implement database optimization service
- Create admin APIs for cache and database management
- Add Celery tasks for scheduled warming
- Create performance indexes migration
- Add optimization script for manual/cron execution
- Configure optimization thresholds and schedules

alembic/versions/007_add_performance_indexes.py ADDED
@@ -0,0 +1,168 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """add performance indexes
2
+
3
+ Revision ID: 007
4
+ Revises: 006
5
+ Create Date: 2025-01-25
6
+ """
7
+ from alembic import op
8
+ import sqlalchemy as sa
9
+
10
+ # revision identifiers
11
+ revision = '007'
12
+ down_revision = '006'
13
+ branch_labels = None
14
+ depends_on = None
15
+
16
+
17
+ def upgrade() -> None:
18
+ """Create performance indexes."""
19
+
20
+ # Investigations table indexes
21
+ op.create_index(
22
+ 'ix_investigations_status_created_at',
23
+ 'investigations',
24
+ ['status', 'created_at']
25
+ )
26
+
27
+ op.create_index(
28
+ 'ix_investigations_contract_id',
29
+ 'investigations',
30
+ ['contract_id']
31
+ )
32
+
33
+ # Chat sessions indexes
34
+ op.create_index(
35
+ 'ix_chat_sessions_user_id_created_at',
36
+ 'chat_sessions',
37
+ ['user_id', 'created_at']
38
+ )
39
+
40
+ # Chat messages indexes
41
+ op.create_index(
42
+ 'ix_chat_messages_session_id_created_at',
43
+ 'chat_messages',
44
+ ['session_id', 'created_at']
45
+ )
46
+
47
+ # API keys indexes (if not already created)
48
+ op.create_index(
49
+ 'ix_api_keys_key_hash',
50
+ 'api_keys',
51
+ ['key_hash'],
52
+ unique=True,
53
+ postgresql_using='hash'
54
+ )
55
+
56
+ op.create_index(
57
+ 'ix_api_keys_status',
58
+ 'api_keys',
59
+ ['status']
60
+ )
61
+
62
+ # Agents table indexes
63
+ op.create_index(
64
+ 'ix_agents_type_status',
65
+ 'agents',
66
+ ['type', 'status']
67
+ )
68
+
69
+ # Anomalies table indexes (if exists)
70
+ try:
71
+ op.create_index(
72
+ 'ix_anomalies_investigation_id',
73
+ 'anomalies',
74
+ ['investigation_id']
75
+ )
76
+
77
+ op.create_index(
78
+ 'ix_anomalies_severity_created_at',
79
+ 'anomalies',
80
+ ['severity', 'created_at']
81
+ )
82
+ except:
83
+ pass # Table might not exist yet
84
+
85
+ # Reports table indexes (if exists)
86
+ try:
87
+ op.create_index(
88
+ 'ix_reports_investigation_id',
89
+ 'reports',
90
+ ['investigation_id']
91
+ )
92
+
93
+ op.create_index(
94
+ 'ix_reports_format_created_at',
95
+ 'reports',
96
+ ['format', 'created_at']
97
+ )
98
+ except:
99
+ pass
100
+
101
+ # Audit logs indexes
102
+ op.create_index(
103
+ 'ix_audit_logs_event_type_timestamp',
104
+ 'audit_logs',
105
+ ['event_type', 'timestamp']
106
+ )
107
+
108
+ op.create_index(
109
+ 'ix_audit_logs_user_id_timestamp',
110
+ 'audit_logs',
111
+ ['user_id', 'timestamp']
112
+ )
113
+
114
+ # Create partial indexes for better performance
115
+ op.execute("""
116
+ CREATE INDEX CONCURRENTLY ix_investigations_pending
117
+ ON investigations (created_at)
118
+ WHERE status = 'PENDING'
119
+ """)
120
+
121
+ op.execute("""
122
+ CREATE INDEX CONCURRENTLY ix_api_keys_active
123
+ ON api_keys (created_at)
124
+ WHERE status = 'ACTIVE'
125
+ """)
126
+
127
+ # Create GIN index for JSONB columns
128
+ op.execute("""
129
+ CREATE INDEX CONCURRENTLY ix_investigations_results_gin
130
+ ON investigations USING gin (results)
131
+ """)
132
+
133
+ op.execute("""
134
+ CREATE INDEX CONCURRENTLY ix_investigations_metadata_gin
135
+ ON investigations USING gin (metadata)
136
+ """)
137
+
138
+
139
+ def downgrade() -> None:
140
+ """Drop performance indexes."""
141
+
142
+ # Drop all created indexes
143
+ op.drop_index('ix_investigations_status_created_at', 'investigations')
144
+ op.drop_index('ix_investigations_contract_id', 'investigations')
145
+ op.drop_index('ix_chat_sessions_user_id_created_at', 'chat_sessions')
146
+ op.drop_index('ix_chat_messages_session_id_created_at', 'chat_messages')
147
+ op.drop_index('ix_api_keys_key_hash', 'api_keys')
148
+ op.drop_index('ix_api_keys_status', 'api_keys')
149
+ op.drop_index('ix_agents_type_status', 'agents')
150
+ op.drop_index('ix_audit_logs_event_type_timestamp', 'audit_logs')
151
+ op.drop_index('ix_audit_logs_user_id_timestamp', 'audit_logs')
152
+
153
+ # Drop partial indexes
154
+ op.execute("DROP INDEX IF EXISTS ix_investigations_pending")
155
+ op.execute("DROP INDEX IF EXISTS ix_api_keys_active")
156
+
157
+ # Drop GIN indexes
158
+ op.execute("DROP INDEX IF EXISTS ix_investigations_results_gin")
159
+ op.execute("DROP INDEX IF EXISTS ix_investigations_metadata_gin")
160
+
161
+ # Drop indexes that might not exist
162
+ try:
163
+ op.drop_index('ix_anomalies_investigation_id', 'anomalies')
164
+ op.drop_index('ix_anomalies_severity_created_at', 'anomalies')
165
+ op.drop_index('ix_reports_investigation_id', 'reports')
166
+ op.drop_index('ix_reports_format_created_at', 'reports')
167
+ except:
168
+ pass
config/database_optimization.yaml ADDED
@@ -0,0 +1,111 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Database Optimization Configuration
2
+
3
+ # Thresholds for identifying issues
4
+ thresholds:
5
+ slow_query_seconds: 1.0
6
+ cache_hit_ratio_min: 0.90
7
+ dead_tuple_ratio_max: 0.20
8
+ unused_index_days: 30
9
+
10
+ # Automatic index creation rules
11
+ auto_indexes:
12
+ # Create indexes on foreign keys
13
+ foreign_keys: true
14
+
15
+ # Create indexes on frequently filtered columns
16
+ frequent_filters:
17
+ min_calls: 100
18
+ enabled: true
19
+
20
+ # Create indexes on frequently joined columns
21
+ frequent_joins:
22
+ min_calls: 50
23
+ enabled: true
24
+
25
+ # Table-specific optimizations
26
+ table_optimizations:
27
+ investigations:
28
+ # Partition by created_at if > 10M rows
29
+ partition_threshold: 10000000
30
+ partition_by: created_at
31
+ partition_interval: monthly
32
+
33
+ chat_messages:
34
+ # Archive old messages
35
+ archive_after_days: 90
36
+ archive_to_table: chat_messages_archive
37
+
38
+ audit_logs:
39
+ # Compress old logs
40
+ compress_after_days: 30
41
+ compression_level: 9
42
+
43
+ # Vacuum settings
44
+ vacuum:
45
+ # Run VACUUM on tables with > 20% dead tuples
46
+ dead_tuple_threshold: 0.20
47
+
48
+ # Run VACUUM FULL on tables with > 50% dead tuples
49
+ full_vacuum_threshold: 0.50
50
+
51
+ # Exclude tables from automatic vacuum
52
+ exclude_tables:
53
+ - spatial_ref_sys
54
+ - pg_stat_statements
55
+
56
+ # Statistics update settings
57
+ statistics:
58
+ # Update statistics if stale
59
+ stale_threshold_days: 7
60
+
61
+ # Sample size for ANALYZE
62
+ default_statistics_target: 100
63
+
64
+ # Tables requiring higher statistics
65
+ high_statistics_tables:
66
+ investigations: 500
67
+ contracts: 500
68
+
69
+ # Monitoring and alerting
70
+ monitoring:
71
+ # Alert if slow queries exceed threshold
72
+ slow_query_alert_count: 10
73
+
74
+ # Alert if cache hit ratio drops
75
+ cache_hit_alert_threshold: 0.85
76
+
77
+ # Alert if connections near limit
78
+ connection_alert_percentage: 0.90
79
+
80
+ # Scheduled optimization tasks
81
+ schedule:
82
+ # Run full optimization
83
+ full_optimization:
84
+ cron: "0 2 * * SUN" # Sunday 2 AM
85
+ enabled: true
86
+
87
+ # Update statistics only
88
+ update_statistics:
89
+ cron: "0 3 * * *" # Daily 3 AM
90
+ enabled: true
91
+
92
+ # Check missing indexes
93
+ check_indexes:
94
+ cron: "0 4 * * MON,THU" # Monday & Thursday 4 AM
95
+ enabled: true
96
+
97
+ # Vacuum high-activity tables
98
+ vacuum_tables:
99
+ cron: "0 5 * * *" # Daily 5 AM
100
+ enabled: true
101
+
102
+ # Performance recommendations
103
+ recommendations:
104
+ # Suggest configuration changes
105
+ suggest_config_changes: true
106
+
107
+ # Suggest query rewrites
108
+ suggest_query_rewrites: true
109
+
110
+ # Suggest schema changes
111
+ suggest_schema_changes: true
scripts/optimize_database.py ADDED
@@ -0,0 +1,166 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Script to optimize database performance.
4
+
5
+ This script can be run manually or scheduled via cron to:
6
+ - Analyze and create missing indexes
7
+ - Update table statistics
8
+ - Vacuum tables with high dead tuple ratio
9
+ - Generate performance reports
10
+
11
+ Usage:
12
+ python scripts/optimize_database.py [options]
13
+
14
+ Options:
15
+ --dry-run Show what would be done without making changes
16
+ --analyze-only Only analyze, don't make any changes
17
+ --force Force optimization even if recently done
18
+ """
19
+
20
+ import asyncio
21
+ import argparse
22
+ import sys
23
+ from datetime import datetime, timezone
24
+ from pathlib import Path
25
+
26
+ # Add project root to path
27
+ sys.path.insert(0, str(Path(__file__).parent.parent))
28
+
29
+ from src.core import get_logger
30
+ from src.db.session import get_session
31
+ from src.services.database_optimization_service import database_optimization_service
32
+ from src.core.config import settings
33
+
34
+ logger = get_logger(__name__)
35
+
36
+
37
+ async def main(args):
38
+ """Main optimization routine."""
39
+ logger.info(
40
+ "database_optimization_started",
41
+ dry_run=args.dry_run,
42
+ analyze_only=args.analyze_only
43
+ )
44
+
45
+ async with get_session() as session:
46
+ # 1. Analyze slow queries
47
+ logger.info("Analyzing slow queries...")
48
+ slow_queries = await database_optimization_service.analyze_slow_queries(
49
+ session=session,
50
+ limit=50
51
+ )
52
+
53
+ print(f"\n=== SLOW QUERIES ANALYSIS ===")
54
+ print(f"Found {len(slow_queries)} slow queries")
55
+
56
+ for i, analysis in enumerate(slow_queries[:10], 1):
57
+ print(f"\n{i}. Query (exec time: {analysis.execution_time:.2f}s):")
58
+ print(f" {analysis.query[:100]}...")
59
+ print(f" Calls: {analysis.plan.get('calls', 0)}")
60
+ print(f" Suggestions: {', '.join(analysis.suggestions)}")
61
+
62
+ # 2. Check missing indexes
63
+ logger.info("Checking for missing indexes...")
64
+ missing_indexes = await database_optimization_service.create_missing_indexes(
65
+ session=session,
66
+ dry_run=True
67
+ )
68
+
69
+ print(f"\n=== MISSING INDEXES ===")
70
+ print(f"Found {len(missing_indexes)} missing indexes")
71
+
72
+ for idx in missing_indexes:
73
+ print(f"\n- Table: {idx['table']}, Column: {idx['column']}")
74
+ print(f" Reason: {idx['reason']}")
75
+ print(f" Command: {idx['command']}")
76
+
77
+ # 3. Create indexes if not in analyze-only mode
78
+ if not args.analyze_only and missing_indexes:
79
+ if args.dry_run:
80
+ print("\n[DRY RUN] Would create the above indexes")
81
+ else:
82
+ print("\nCreating missing indexes...")
83
+ created_indexes = await database_optimization_service.create_missing_indexes(
84
+ session=session,
85
+ dry_run=False
86
+ )
87
+
88
+ created_count = sum(1 for idx in created_indexes if idx.get('status') == 'created')
89
+ print(f"Created {created_count} indexes")
90
+
91
+ # 4. Update statistics
92
+ if not args.analyze_only:
93
+ if args.dry_run:
94
+ print("\n[DRY RUN] Would update table statistics")
95
+ else:
96
+ print("\nUpdating table statistics...")
97
+ stats_result = await database_optimization_service.optimize_table_statistics(
98
+ session=session
99
+ )
100
+
101
+ print(f"Analyzed {len(stats_result['analyzed'])} tables")
102
+ print(f"Vacuumed {len(stats_result['vacuumed'])} tables")
103
+
104
+ # 5. Get database stats
105
+ print("\n=== DATABASE STATISTICS ===")
106
+ db_stats = await database_optimization_service.get_database_stats(session)
107
+
108
+ print(f"Database size: {db_stats['database_size']['pretty']}")
109
+ print(f"Cache hit ratio: {db_stats['cache_hit_ratio']['ratio']:.1%}")
110
+ print(f"Active connections: {db_stats['connections']['active']}/{db_stats['connections']['total']}")
111
+
112
+ print("\nLargest tables:")
113
+ for table in db_stats['largest_tables'][:5]:
114
+ print(f"- {table['table']}: {table['size_pretty']} ({table['row_count']} rows)")
115
+
116
+ # 6. Generate report
117
+ report = {
118
+ "timestamp": datetime.now(timezone.utc).isoformat(),
119
+ "slow_queries": len(slow_queries),
120
+ "missing_indexes": len(missing_indexes),
121
+ "created_indexes": created_count if not args.analyze_only and not args.dry_run else 0,
122
+ "database_size": db_stats['database_size']['pretty'],
123
+ "cache_hit_ratio": db_stats['cache_hit_ratio']['ratio']
124
+ }
125
+
126
+ # Save report
127
+ report_path = Path("logs/database_optimization_report.json")
128
+ report_path.parent.mkdir(exist_ok=True)
129
+
130
+ import json
131
+ with open(report_path, "w") as f:
132
+ json.dump(report, f, indent=2)
133
+
134
+ print(f"\n=== OPTIMIZATION COMPLETE ===")
135
+ print(f"Report saved to: {report_path}")
136
+
137
+ logger.info(
138
+ "database_optimization_completed",
139
+ report=report
140
+ )
141
+
142
+
143
+ if __name__ == "__main__":
144
+ parser = argparse.ArgumentParser(
145
+ description="Optimize database performance"
146
+ )
147
+ parser.add_argument(
148
+ "--dry-run",
149
+ action="store_true",
150
+ help="Show what would be done without making changes"
151
+ )
152
+ parser.add_argument(
153
+ "--analyze-only",
154
+ action="store_true",
155
+ help="Only analyze, don't make any changes"
156
+ )
157
+ parser.add_argument(
158
+ "--force",
159
+ action="store_true",
160
+ help="Force optimization even if recently done"
161
+ )
162
+
163
+ args = parser.parse_args()
164
+
165
+ # Run optimization
166
+ asyncio.run(main(args))
src/api/app.py CHANGED
@@ -214,6 +214,19 @@ app.add_middleware(
214
  strategy="sliding_window"
215
  )
216
 
 
 
 
 
 
 
 
 
 
 
 
 
 
217
 
218
  # Custom OpenAPI schema
219
  def custom_openapi():
@@ -392,6 +405,8 @@ app.include_router(
392
 
393
  # Import and include admin routes
394
  from src.api.routes.admin import ip_whitelist as admin_ip_whitelist
 
 
395
  from src.api.routes import api_keys
396
 
397
  app.include_router(
@@ -400,6 +415,18 @@ app.include_router(
400
  tags=["Admin - IP Whitelist"]
401
  )
402
 
 
 
 
 
 
 
 
 
 
 
 
 
403
  app.include_router(
404
  api_keys.router,
405
  prefix="/api/v1",
 
214
  strategy="sliding_window"
215
  )
216
 
217
+ # Add query tracking middleware for cache optimization
218
+ from src.api.middleware.query_tracking import QueryTrackingMiddleware
219
+ app.add_middleware(
220
+ QueryTrackingMiddleware,
221
+ tracked_paths=[
222
+ "/api/v1/investigations",
223
+ "/api/v1/contracts",
224
+ "/api/v1/analysis",
225
+ "/api/v1/reports"
226
+ ],
227
+ sample_rate=0.1 if settings.is_production else 1.0 # 10% sampling in production
228
+ )
229
+
230
 
231
  # Custom OpenAPI schema
232
  def custom_openapi():
 
405
 
406
  # Import and include admin routes
407
  from src.api.routes.admin import ip_whitelist as admin_ip_whitelist
408
+ from src.api.routes.admin import cache_warming as admin_cache_warming
409
+ from src.api.routes.admin import database_optimization as admin_db_optimization
410
  from src.api.routes import api_keys
411
 
412
  app.include_router(
 
415
  tags=["Admin - IP Whitelist"]
416
  )
417
 
418
+ app.include_router(
419
+ admin_cache_warming.router,
420
+ prefix="/api/v1/admin",
421
+ tags=["Admin - Cache Warming"]
422
+ )
423
+
424
+ app.include_router(
425
+ admin_db_optimization.router,
426
+ prefix="/api/v1/admin",
427
+ tags=["Admin - Database Optimization"]
428
+ )
429
+
430
  app.include_router(
431
  api_keys.router,
432
  prefix="/api/v1",
src/api/middleware/query_tracking.py ADDED
@@ -0,0 +1,113 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Module: api.middleware.query_tracking
3
+ Description: Middleware to track query patterns for cache warming
4
+ Author: Anderson H. Silva
5
+ Date: 2025-01-25
6
+ License: Proprietary - All rights reserved
7
+ """
8
+
9
+ from typing import Dict, Any
10
+ import json
11
+ import hashlib
12
+
13
+ from fastapi import Request
14
+ from starlette.middleware.base import BaseHTTPMiddleware
15
+
16
+ from src.core import get_logger
17
+ from src.services.cache_warming_service import cache_warming_service
18
+
19
+ logger = get_logger(__name__)
20
+
21
+
22
+ class QueryTrackingMiddleware(BaseHTTPMiddleware):
23
+ """
24
+ Middleware to track query patterns for cache optimization.
25
+
26
+ Tracks:
27
+ - API endpoint access frequency
28
+ - Query parameters
29
+ - Response times
30
+ - Cache hit/miss patterns
31
+ """
32
+
33
+ def __init__(
34
+ self,
35
+ app,
36
+ tracked_paths: list = None,
37
+ sample_rate: float = 1.0
38
+ ):
39
+ """
40
+ Initialize query tracking middleware.
41
+
42
+ Args:
43
+ app: FastAPI application
44
+ tracked_paths: List of paths to track (None = all)
45
+ sample_rate: Sampling rate (0.0 to 1.0)
46
+ """
47
+ super().__init__(app)
48
+ self.tracked_paths = tracked_paths or [
49
+ "/api/v1/investigations",
50
+ "/api/v1/contracts",
51
+ "/api/v1/analysis",
52
+ "/api/v1/reports",
53
+ "/api/v1/chat"
54
+ ]
55
+ self.sample_rate = sample_rate
56
+
57
+ async def dispatch(self, request: Request, call_next):
58
+ """Process request with query tracking."""
59
+ # Check if we should track this request
60
+ if not self._should_track(request):
61
+ return await call_next(request)
62
+
63
+ # Extract query information
64
+ query_info = self._extract_query_info(request)
65
+
66
+ # Process request
67
+ response = await call_next(request)
68
+
69
+ # Track query in background
70
+ try:
71
+ cache_warming_service.track_query(query_info)
72
+ except Exception as e:
73
+ logger.error(
74
+ "query_tracking_error",
75
+ error=str(e),
76
+ query_info=query_info
77
+ )
78
+
79
+ return response
80
+
81
+ def _should_track(self, request: Request) -> bool:
82
+ """Check if request should be tracked."""
83
+ # Check sampling rate
84
+ import random
85
+ if random.random() > self.sample_rate:
86
+ return False
87
+
88
+ # Check path
89
+ path = request.url.path
90
+ for tracked_path in self.tracked_paths:
91
+ if path.startswith(tracked_path):
92
+ return True
93
+
94
+ return False
95
+
96
+ def _extract_query_info(self, request: Request) -> Dict[str, Any]:
97
+ """Extract query information from request."""
98
+ query_info = {
99
+ "path": request.url.path,
100
+ "method": request.method,
101
+ "query_params": dict(request.query_params),
102
+ "timestamp": None # Will be set by service
103
+ }
104
+
105
+ # Add path parameters if available
106
+ if hasattr(request, "path_params"):
107
+ query_info["path_params"] = request.path_params
108
+
109
+ # Generate query hash for deduplication
110
+ query_str = json.dumps(query_info, sort_keys=True)
111
+ query_info["hash"] = hashlib.md5(query_str.encode()).hexdigest()
112
+
113
+ return query_info
src/api/routes/admin/cache_warming.py ADDED
@@ -0,0 +1,201 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Module: api.routes.admin.cache_warming
3
+ Description: Admin routes for cache warming management
4
+ Author: Anderson H. Silva
5
+ Date: 2025-01-25
6
+ License: Proprietary - All rights reserved
7
+ """
8
+
9
+ from typing import List, Optional, Dict, Any
10
+ from datetime import datetime
11
+
12
+ from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
13
+ from pydantic import BaseModel, Field
14
+
15
+ from src.core import get_logger
16
+ from src.api.dependencies import require_admin
17
+ from src.services.cache_warming_service import (
18
+ cache_warming_service,
19
+ CacheWarmingStrategy
20
+ )
21
+
22
+ logger = get_logger(__name__)
23
+
24
+ router = APIRouter(prefix="/cache-warming", tags=["Admin - Cache Warming"])
25
+
26
+
27
+ class CacheWarmingRequest(BaseModel):
28
+ """Request to warm specific cache data."""
29
+ strategies: Optional[List[CacheWarmingStrategy]] = Field(
30
+ None,
31
+ description="Specific strategies to execute (None = all)"
32
+ )
33
+
34
+
35
+ class SpecificDataWarmingRequest(BaseModel):
36
+ """Request to warm specific data items."""
37
+ data_type: str = Field(..., description="Type of data to warm")
38
+ identifiers: List[str] = Field(..., description="List of identifiers")
39
+ ttl: Optional[int] = Field(None, description="Cache TTL in seconds")
40
+
41
+
42
+ class CacheWarmingStatusResponse(BaseModel):
43
+ """Cache warming status response."""
44
+ last_warming: Optional[datetime]
45
+ query_frequency_tracked: int
46
+ top_queries: List[tuple]
47
+ config: Dict[str, Any]
48
+
49
+
50
+ @router.post("/trigger")
51
+ async def trigger_cache_warming(
52
+ request: CacheWarmingRequest,
53
+ background_tasks: BackgroundTasks,
54
+ admin_user=Depends(require_admin)
55
+ ):
56
+ """
57
+ Manually trigger cache warming.
58
+
59
+ Requires admin privileges.
60
+ """
61
+ logger.info(
62
+ "admin_cache_warming_triggered",
63
+ admin=admin_user.get("email"),
64
+ strategies=request.strategies
65
+ )
66
+
67
+ # Execute in background
68
+ background_tasks.add_task(
69
+ cache_warming_service.trigger_manual_warming,
70
+ request.strategies
71
+ )
72
+
73
+ return {
74
+ "status": "warming_started",
75
+ "strategies": request.strategies or "all",
76
+ "message": "Cache warming started in background"
77
+ }
78
+
79
+
80
+ @router.post("/warm-specific")
81
+ async def warm_specific_data(
82
+ request: SpecificDataWarmingRequest,
83
+ admin_user=Depends(require_admin)
84
+ ):
85
+ """
86
+ Warm cache with specific data items.
87
+
88
+ Requires admin privileges.
89
+ """
90
+ try:
91
+ results = await cache_warming_service.warm_specific_data(
92
+ data_type=request.data_type,
93
+ identifiers=request.identifiers,
94
+ ttl=request.ttl
95
+ )
96
+
97
+ logger.info(
98
+ "admin_specific_data_warmed",
99
+ admin=admin_user.get("email"),
100
+ data_type=request.data_type,
101
+ warmed_count=len(results["warmed"]),
102
+ failed_count=len(results["failed"])
103
+ )
104
+
105
+ return results
106
+
107
+ except Exception as e:
108
+ logger.error(
109
+ "admin_specific_data_warming_error",
110
+ error=str(e),
111
+ exc_info=True
112
+ )
113
+ raise HTTPException(
114
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
115
+ detail="Failed to warm specific data"
116
+ )
117
+
118
+
119
+ @router.get("/status", response_model=CacheWarmingStatusResponse)
120
+ async def get_warming_status(
121
+ admin_user=Depends(require_admin)
122
+ ):
123
+ """
124
+ Get cache warming status.
125
+
126
+ Requires admin privileges.
127
+ """
128
+ status = await cache_warming_service.get_warming_status()
129
+
130
+ return CacheWarmingStatusResponse(
131
+ last_warming=status["last_warming"],
132
+ query_frequency_tracked=status["query_frequency_tracked"],
133
+ top_queries=status["top_queries"],
134
+ config=status["config"]
135
+ )
136
+
137
+
138
+ @router.post("/strategies/{strategy}")
139
+ async def execute_single_strategy(
140
+ strategy: CacheWarmingStrategy,
141
+ background_tasks: BackgroundTasks,
142
+ admin_user=Depends(require_admin)
143
+ ):
144
+ """
145
+ Execute a single cache warming strategy.
146
+
147
+ Requires admin privileges.
148
+ """
149
+ logger.info(
150
+ "admin_single_strategy_warming",
151
+ admin=admin_user.get("email"),
152
+ strategy=strategy
153
+ )
154
+
155
+ # Execute in background
156
+ background_tasks.add_task(
157
+ cache_warming_service.trigger_manual_warming,
158
+ [strategy]
159
+ )
160
+
161
+ return {
162
+ "status": "strategy_started",
163
+ "strategy": strategy,
164
+ "message": f"Cache warming strategy '{strategy}' started"
165
+ }
166
+
167
+
168
+ @router.get("/strategies")
169
+ async def list_available_strategies(
170
+ admin_user=Depends(require_admin)
171
+ ):
172
+ """
173
+ List available cache warming strategies.
174
+
175
+ Requires admin privileges.
176
+ """
177
+ strategies = [
178
+ {
179
+ "name": strategy.value,
180
+ "description": get_strategy_description(strategy)
181
+ }
182
+ for strategy in CacheWarmingStrategy
183
+ ]
184
+
185
+ return {
186
+ "strategies": strategies,
187
+ "total": len(strategies)
188
+ }
189
+
190
+
191
+ def get_strategy_description(strategy: CacheWarmingStrategy) -> str:
192
+ """Get human-readable description for strategy."""
193
+ descriptions = {
194
+ CacheWarmingStrategy.POPULAR_DATA: "Warm cache with frequently accessed contracts and data",
195
+ CacheWarmingStrategy.RECENT_INVESTIGATIONS: "Cache recent investigation results",
196
+ CacheWarmingStrategy.FREQUENT_QUERIES: "Cache results of frequently executed queries",
197
+ CacheWarmingStrategy.AGENT_POOLS: "Pre-initialize agent pool connections",
198
+ CacheWarmingStrategy.STATIC_RESOURCES: "Cache static configuration and reference data",
199
+ CacheWarmingStrategy.PREDICTIVE: "Use ML to predict and cache likely needed data"
200
+ }
201
+ return descriptions.get(strategy, "No description available")
src/api/routes/admin/database_optimization.py ADDED
@@ -0,0 +1,267 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Module: api.routes.admin.database_optimization
3
+ Description: Admin routes for database optimization
4
+ Author: Anderson H. Silva
5
+ Date: 2025-01-25
6
+ License: Proprietary - All rights reserved
7
+ """
8
+
9
+ from typing import List, Optional, Dict, Any
10
+ from fastapi import APIRouter, Depends, HTTPException, status, Query
11
+
12
+ from src.core import get_logger
13
+ from src.api.dependencies import require_admin, get_db
14
+ from src.services.database_optimization_service import database_optimization_service
15
+
16
+ logger = get_logger(__name__)
17
+
18
+ router = APIRouter(prefix="/database-optimization", tags=["Admin - Database Optimization"])
19
+
20
+
21
+ @router.get("/analyze-slow-queries")
22
+ async def analyze_slow_queries(
23
+ limit: int = Query(default=20, ge=1, le=100),
24
+ admin_user=Depends(require_admin),
25
+ db=Depends(get_db)
26
+ ):
27
+ """
28
+ Analyze slow queries and get optimization suggestions.
29
+
30
+ Requires admin privileges.
31
+ """
32
+ try:
33
+ analyses = await database_optimization_service.analyze_slow_queries(
34
+ session=db,
35
+ limit=limit
36
+ )
37
+
38
+ # Format response
39
+ results = []
40
+ for analysis in analyses:
41
+ results.append({
42
+ "query": analysis.query[:200] + "..." if len(analysis.query) > 200 else analysis.query,
43
+ "execution_time": analysis.execution_time,
44
+ "calls": analysis.plan.get("calls", 0),
45
+ "total_time": analysis.plan.get("total_time", 0),
46
+ "suggestions": analysis.suggestions,
47
+ "estimated_improvement": analysis.estimated_improvement
48
+ })
49
+
50
+ logger.info(
51
+ "admin_slow_queries_analyzed",
52
+ admin=admin_user.get("email"),
53
+ queries_count=len(results)
54
+ )
55
+
56
+ return {
57
+ "slow_queries": results,
58
+ "total": len(results),
59
+ "threshold_seconds": database_optimization_service._slow_query_threshold
60
+ }
61
+
62
+ except Exception as e:
63
+ logger.error(
64
+ "analyze_slow_queries_error",
65
+ error=str(e),
66
+ exc_info=True
67
+ )
68
+ raise HTTPException(
69
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
70
+ detail="Failed to analyze slow queries"
71
+ )
72
+
73
+
74
+ @router.get("/missing-indexes")
75
+ async def get_missing_indexes(
76
+ admin_user=Depends(require_admin),
77
+ db=Depends(get_db)
78
+ ):
79
+ """
80
+ Get suggestions for missing indexes.
81
+
82
+ Requires admin privileges.
83
+ """
84
+ try:
85
+ index_suggestions = await database_optimization_service.create_missing_indexes(
86
+ session=db,
87
+ dry_run=True # Don't create, just suggest
88
+ )
89
+
90
+ logger.info(
91
+ "admin_missing_indexes_analyzed",
92
+ admin=admin_user.get("email"),
93
+ suggestions_count=len(index_suggestions)
94
+ )
95
+
96
+ return {
97
+ "missing_indexes": index_suggestions,
98
+ "total": len(index_suggestions)
99
+ }
100
+
101
+ except Exception as e:
102
+ logger.error(
103
+ "missing_indexes_error",
104
+ error=str(e),
105
+ exc_info=True
106
+ )
107
+ raise HTTPException(
108
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
109
+ detail="Failed to analyze missing indexes"
110
+ )
111
+
112
+
113
+ @router.post("/create-indexes")
114
+ async def create_missing_indexes(
115
+ dry_run: bool = Query(default=True, description="If true, only show what would be created"),
116
+ admin_user=Depends(require_admin),
117
+ db=Depends(get_db)
118
+ ):
119
+ """
120
+ Create missing indexes based on analysis.
121
+
122
+ Requires admin privileges.
123
+ """
124
+ try:
125
+ results = await database_optimization_service.create_missing_indexes(
126
+ session=db,
127
+ dry_run=dry_run
128
+ )
129
+
130
+ created_count = sum(1 for r in results if r.get("status") == "created")
131
+
132
+ logger.info(
133
+ "admin_indexes_created",
134
+ admin=admin_user.get("email"),
135
+ dry_run=dry_run,
136
+ created=created_count,
137
+ total=len(results)
138
+ )
139
+
140
+ return {
141
+ "dry_run": dry_run,
142
+ "indexes": results,
143
+ "created": created_count,
144
+ "total": len(results)
145
+ }
146
+
147
+ except Exception as e:
148
+ logger.error(
149
+ "create_indexes_error",
150
+ error=str(e),
151
+ exc_info=True
152
+ )
153
+ raise HTTPException(
154
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
155
+ detail="Failed to create indexes"
156
+ )
157
+
158
+
159
+ @router.post("/optimize-statistics")
160
+ async def optimize_table_statistics(
161
+ tables: Optional[List[str]] = None,
162
+ admin_user=Depends(require_admin),
163
+ db=Depends(get_db)
164
+ ):
165
+ """
166
+ Update table statistics for query planner optimization.
167
+
168
+ Requires admin privileges.
169
+ """
170
+ try:
171
+ results = await database_optimization_service.optimize_table_statistics(
172
+ session=db,
173
+ tables=tables
174
+ )
175
+
176
+ logger.info(
177
+ "admin_statistics_optimized",
178
+ admin=admin_user.get("email"),
179
+ analyzed=len(results["analyzed"]),
180
+ vacuumed=len(results["vacuumed"])
181
+ )
182
+
183
+ return results
184
+
185
+ except Exception as e:
186
+ logger.error(
187
+ "optimize_statistics_error",
188
+ error=str(e),
189
+ exc_info=True
190
+ )
191
+ raise HTTPException(
192
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
193
+ detail="Failed to optimize statistics"
194
+ )
195
+
196
+
197
+ @router.get("/database-stats")
198
+ async def get_database_statistics(
199
+ admin_user=Depends(require_admin),
200
+ db=Depends(get_db)
201
+ ):
202
+ """
203
+ Get comprehensive database statistics.
204
+
205
+ Requires admin privileges.
206
+ """
207
+ try:
208
+ stats = await database_optimization_service.get_database_stats(db)
209
+
210
+ return {
211
+ "database_size": stats.get("database_size"),
212
+ "largest_tables": stats.get("largest_tables", []),
213
+ "least_used_indexes": stats.get("least_used_indexes", []),
214
+ "cache_hit_ratio": stats.get("cache_hit_ratio"),
215
+ "connections": stats.get("connections"),
216
+ "recommendations": generate_recommendations(stats)
217
+ }
218
+
219
+ except Exception as e:
220
+ logger.error(
221
+ "database_stats_error",
222
+ error=str(e),
223
+ exc_info=True
224
+ )
225
+ raise HTTPException(
226
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
227
+ detail="Failed to get database statistics"
228
+ )
229
+
230
+
231
+ def generate_recommendations(stats: Dict[str, Any]) -> List[str]:
232
+ """Generate recommendations based on statistics."""
233
+ recommendations = []
234
+
235
+ # Cache hit ratio
236
+ cache_ratio = stats.get("cache_hit_ratio", {}).get("ratio", 0)
237
+ if cache_ratio < 0.90:
238
+ recommendations.append(
239
+ f"Cache hit ratio is {cache_ratio:.1%}. Consider increasing shared_buffers."
240
+ )
241
+
242
+ # Unused indexes
243
+ unused_indexes = [
244
+ idx for idx in stats.get("least_used_indexes", [])
245
+ if idx["scans"] == 0
246
+ ]
247
+ if unused_indexes:
248
+ recommendations.append(
249
+ f"Found {len(unused_indexes)} unused indexes consuming space. Consider dropping them."
250
+ )
251
+
252
+ # Connection pooling
253
+ connections = stats.get("connections", {})
254
+ idle_ratio = connections.get("idle", 0) / max(connections.get("total", 1), 1)
255
+ if idle_ratio > 0.8:
256
+ recommendations.append(
257
+ "High idle connection ratio. Consider adjusting connection pool settings."
258
+ )
259
+
260
+ # Large tables
261
+ large_tables = stats.get("largest_tables", [])
262
+ if large_tables and large_tables[0]["row_count"] > 10000000:
263
+ recommendations.append(
264
+ "Very large tables detected. Consider partitioning for better performance."
265
+ )
266
+
267
+ return recommendations
src/services/cache_warming_service.py ADDED
@@ -0,0 +1,454 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Module: services.cache_warming_service
3
+ Description: Cache warming strategies for improved performance
4
+ Author: Anderson H. Silva
5
+ Date: 2025-01-25
6
+ License: Proprietary - All rights reserved
7
+ """
8
+
9
+ import asyncio
10
+ from typing import List, Dict, Any, Optional, Set
11
+ from datetime import datetime, timedelta, timezone
12
+ from enum import Enum
13
+ import hashlib
14
+
15
+ from src.core import get_logger
16
+ from src.infrastructure.cache import cache_service
17
+ from src.services.data_service import data_service
18
+ from src.services.investigation_service import investigation_service
19
+ from src.core.config import settings
20
+ from src.db.session import get_session
21
+ from src.models import Investigation
22
+
23
+ logger = get_logger(__name__)
24
+
25
+
26
+ class CacheWarmingStrategy(str, Enum):
27
+ """Cache warming strategies."""
28
+ POPULAR_DATA = "popular_data"
29
+ RECENT_INVESTIGATIONS = "recent_investigations"
30
+ FREQUENT_QUERIES = "frequent_queries"
31
+ AGENT_POOLS = "agent_pools"
32
+ STATIC_RESOURCES = "static_resources"
33
+ PREDICTIVE = "predictive"
34
+
35
+
36
+ class CacheWarmingConfig:
37
+ """Configuration for cache warming."""
38
+
39
+ # TTLs por tipo de dado
40
+ TTL_CONFIG = {
41
+ "contracts": 3600, # 1 hora
42
+ "investigations": 1800, # 30 minutos
43
+ "agent_pools": 7200, # 2 horas
44
+ "static_data": 86400, # 24 horas
45
+ "frequent_queries": 600, # 10 minutos
46
+ "analytics": 3600 # 1 hora
47
+ }
48
+
49
+ # Limites de warming
50
+ MAX_ITEMS_PER_TYPE = {
51
+ "contracts": 100,
52
+ "investigations": 50,
53
+ "queries": 200,
54
+ "agents": 20
55
+ }
56
+
57
+ # Configuração de prioridades
58
+ PRIORITY_WEIGHTS = {
59
+ "recency": 0.3,
60
+ "frequency": 0.4,
61
+ "importance": 0.3
62
+ }
63
+
64
+
65
+ class CacheWarmingService:
66
+ """Service for cache warming operations."""
67
+
68
+ def __init__(self):
69
+ """Initialize cache warming service."""
70
+ self._config = CacheWarmingConfig()
71
+ self._warming_tasks: Set[asyncio.Task] = set()
72
+ self._last_warming: Dict[str, datetime] = {}
73
+ self._query_frequency: Dict[str, int] = {}
74
+ self._warming_interval = 300 # 5 minutos
75
+
76
+ async def start_warming_scheduler(self):
77
+ """Start the cache warming scheduler."""
78
+ logger.info("cache_warming_scheduler_started")
79
+
80
+ while True:
81
+ try:
82
+ # Execute warming strategies
83
+ await self.warm_all_caches()
84
+
85
+ # Wait for next interval
86
+ await asyncio.sleep(self._warming_interval)
87
+
88
+ except asyncio.CancelledError:
89
+ logger.info("cache_warming_scheduler_stopped")
90
+ break
91
+ except Exception as e:
92
+ logger.error(
93
+ "cache_warming_scheduler_error",
94
+ error=str(e),
95
+ exc_info=True
96
+ )
97
+ await asyncio.sleep(60) # Wait 1 minute on error
98
+
99
+ async def warm_all_caches(self):
100
+ """Execute all cache warming strategies."""
101
+ start_time = datetime.now(timezone.utc)
102
+
103
+ strategies = [
104
+ self._warm_popular_data(),
105
+ self._warm_recent_investigations(),
106
+ self._warm_frequent_queries(),
107
+ self._warm_agent_pools(),
108
+ self._warm_static_resources()
109
+ ]
110
+
111
+ # Execute strategies in parallel
112
+ results = await asyncio.gather(*strategies, return_exceptions=True)
113
+
114
+ # Log results
115
+ duration = (datetime.now(timezone.utc) - start_time).total_seconds()
116
+ successful = sum(1 for r in results if not isinstance(r, Exception))
117
+
118
+ logger.info(
119
+ "cache_warming_completed",
120
+ duration_seconds=duration,
121
+ strategies_total=len(strategies),
122
+ strategies_successful=successful
123
+ )
124
+
125
+ # Update last warming time
126
+ self._last_warming["all"] = datetime.now(timezone.utc)
127
+
128
+ async def _warm_popular_data(self) -> Dict[str, Any]:
129
+ """Warm cache with popular/frequently accessed data."""
130
+ warmed_count = 0
131
+
132
+ try:
133
+ # Get most accessed contracts
134
+ popular_contracts = await self._get_popular_contracts()
135
+
136
+ for contract_id in popular_contracts[:self._config.MAX_ITEMS_PER_TYPE["contracts"]]:
137
+ cache_key = f"contract:{contract_id}"
138
+
139
+ # Check if already cached
140
+ if await cache_service.get(cache_key):
141
+ continue
142
+
143
+ # Fetch and cache
144
+ try:
145
+ contract_data = await data_service.get_contract(contract_id)
146
+ if contract_data:
147
+ await cache_service.set(
148
+ cache_key,
149
+ contract_data,
150
+ ttl=self._config.TTL_CONFIG["contracts"]
151
+ )
152
+ warmed_count += 1
153
+ except Exception as e:
154
+ logger.error(f"Failed to warm contract {contract_id}: {e}")
155
+
156
+ logger.info(
157
+ "popular_data_warmed",
158
+ contracts_warmed=warmed_count
159
+ )
160
+
161
+ return {"contracts": warmed_count}
162
+
163
+ except Exception as e:
164
+ logger.error("popular_data_warming_failed", error=str(e))
165
+ raise
166
+
167
+ async def _warm_recent_investigations(self) -> Dict[str, Any]:
168
+ """Warm cache with recent investigations."""
169
+ warmed_count = 0
170
+
171
+ try:
172
+ async with get_session() as session:
173
+ # Get recent investigations
174
+ from sqlalchemy import select, desc
175
+ query = select(Investigation).order_by(
176
+ desc(Investigation.created_at)
177
+ ).limit(self._config.MAX_ITEMS_PER_TYPE["investigations"])
178
+
179
+ result = await session.execute(query)
180
+ investigations = result.scalars().all()
181
+
182
+ for investigation in investigations:
183
+ cache_key = f"investigation:{investigation.id}"
184
+
185
+ # Cache investigation data
186
+ await cache_service.set(
187
+ cache_key,
188
+ {
189
+ "id": investigation.id,
190
+ "status": investigation.status,
191
+ "contract_id": investigation.contract_id,
192
+ "results": investigation.results,
193
+ "created_at": investigation.created_at.isoformat()
194
+ },
195
+ ttl=self._config.TTL_CONFIG["investigations"]
196
+ )
197
+ warmed_count += 1
198
+
199
+ logger.info(
200
+ "recent_investigations_warmed",
201
+ count=warmed_count
202
+ )
203
+
204
+ return {"investigations": warmed_count}
205
+
206
+ except Exception as e:
207
+ logger.error("recent_investigations_warming_failed", error=str(e))
208
+ raise
209
+
210
+ async def _warm_frequent_queries(self) -> Dict[str, Any]:
211
+ """Warm cache with results of frequent queries."""
212
+ warmed_count = 0
213
+
214
+ try:
215
+ # Sort queries by frequency
216
+ frequent_queries = sorted(
217
+ self._query_frequency.items(),
218
+ key=lambda x: x[1],
219
+ reverse=True
220
+ )[:self._config.MAX_ITEMS_PER_TYPE["queries"]]
221
+
222
+ for query_hash, frequency in frequent_queries:
223
+ cache_key = f"query_result:{query_hash}"
224
+
225
+ # Skip if already cached
226
+ if await cache_service.get(cache_key):
227
+ continue
228
+
229
+ # Note: In a real implementation, you would store and replay
230
+ # the actual query parameters to regenerate results
231
+ warmed_count += 1
232
+
233
+ logger.info(
234
+ "frequent_queries_warmed",
235
+ count=warmed_count
236
+ )
237
+
238
+ return {"queries": warmed_count}
239
+
240
+ except Exception as e:
241
+ logger.error("frequent_queries_warming_failed", error=str(e))
242
+ raise
243
+
244
+ async def _warm_agent_pools(self) -> Dict[str, Any]:
245
+ """Warm agent pool connections."""
246
+ warmed_count = 0
247
+
248
+ try:
249
+ # Pre-initialize agent pools
250
+ agent_types = [
251
+ "zumbi",
252
+ "anita",
253
+ "tiradentes",
254
+ "machado",
255
+ "dandara"
256
+ ]
257
+
258
+ for agent_type in agent_types[:self._config.MAX_ITEMS_PER_TYPE["agents"]]:
259
+ cache_key = f"agent_pool:{agent_type}:status"
260
+
261
+ # Cache agent pool status
262
+ await cache_service.set(
263
+ cache_key,
264
+ {
265
+ "type": agent_type,
266
+ "initialized": True,
267
+ "last_used": datetime.now(timezone.utc).isoformat()
268
+ },
269
+ ttl=self._config.TTL_CONFIG["agent_pools"]
270
+ )
271
+ warmed_count += 1
272
+
273
+ logger.info(
274
+ "agent_pools_warmed",
275
+ count=warmed_count
276
+ )
277
+
278
+ return {"agents": warmed_count}
279
+
280
+ except Exception as e:
281
+ logger.error("agent_pools_warming_failed", error=str(e))
282
+ raise
283
+
284
+ async def _warm_static_resources(self) -> Dict[str, Any]:
285
+ """Warm cache with static resources."""
286
+ warmed_count = 0
287
+
288
+ try:
289
+ # Static data to cache
290
+ static_data = {
291
+ "system_config": {
292
+ "version": "1.0.0",
293
+ "features": ["investigations", "reports", "analysis"],
294
+ "agents": ["zumbi", "anita", "tiradentes"]
295
+ },
296
+ "contract_types": [
297
+ "licitacao",
298
+ "contrato",
299
+ "convenio",
300
+ "termo_aditivo"
301
+ ],
302
+ "anomaly_types": [
303
+ "valor_atipico",
304
+ "padrao_temporal",
305
+ "fornecedor_suspeito",
306
+ "fragmentacao"
307
+ ]
308
+ }
309
+
310
+ for key, data in static_data.items():
311
+ cache_key = f"static:{key}"
312
+ await cache_service.set(
313
+ cache_key,
314
+ data,
315
+ ttl=self._config.TTL_CONFIG["static_data"]
316
+ )
317
+ warmed_count += 1
318
+
319
+ logger.info(
320
+ "static_resources_warmed",
321
+ count=warmed_count
322
+ )
323
+
324
+ return {"static": warmed_count}
325
+
326
+ except Exception as e:
327
+ logger.error("static_resources_warming_failed", error=str(e))
328
+ raise
329
+
330
+ async def _get_popular_contracts(self) -> List[str]:
331
+ """Get list of popular contract IDs."""
332
+ # In a real implementation, this would query analytics
333
+ # or access logs to find most accessed contracts
334
+ return [
335
+ "CONT-2024-001",
336
+ "CONT-2024-002",
337
+ "CONT-2024-003",
338
+ "CONT-2024-004",
339
+ "CONT-2024-005"
340
+ ]
341
+
342
+ def track_query(self, query_params: Dict[str, Any]):
343
+ """Track query frequency for cache warming."""
344
+ # Generate query hash
345
+ query_str = str(sorted(query_params.items()))
346
+ query_hash = hashlib.md5(query_str.encode()).hexdigest()
347
+
348
+ # Update frequency
349
+ self._query_frequency[query_hash] = self._query_frequency.get(query_hash, 0) + 1
350
+
351
+ # Limit stored queries
352
+ if len(self._query_frequency) > 1000:
353
+ # Remove least frequent queries
354
+ sorted_queries = sorted(
355
+ self._query_frequency.items(),
356
+ key=lambda x: x[1]
357
+ )
358
+ for query, _ in sorted_queries[:100]:
359
+ del self._query_frequency[query]
360
+
361
+ async def warm_specific_data(
362
+ self,
363
+ data_type: str,
364
+ identifiers: List[str],
365
+ ttl: Optional[int] = None
366
+ ) -> Dict[str, Any]:
367
+ """Warm cache with specific data."""
368
+ if ttl is None:
369
+ ttl = self._config.TTL_CONFIG.get(data_type, 3600)
370
+
371
+ warmed = []
372
+ failed = []
373
+
374
+ for identifier in identifiers:
375
+ try:
376
+ cache_key = f"{data_type}:{identifier}"
377
+
378
+ # Skip if already cached
379
+ if await cache_service.get(cache_key):
380
+ continue
381
+
382
+ # Fetch data based on type
383
+ data = None
384
+ if data_type == "contract":
385
+ data = await data_service.get_contract(identifier)
386
+ elif data_type == "investigation":
387
+ data = await investigation_service.get_investigation(identifier)
388
+
389
+ if data:
390
+ await cache_service.set(cache_key, data, ttl=ttl)
391
+ warmed.append(identifier)
392
+ else:
393
+ failed.append(identifier)
394
+
395
+ except Exception as e:
396
+ logger.error(
397
+ f"Failed to warm {data_type}:{identifier}: {e}"
398
+ )
399
+ failed.append(identifier)
400
+
401
+ return {
402
+ "warmed": warmed,
403
+ "failed": failed,
404
+ "total": len(identifiers)
405
+ }
406
+
407
+ async def get_warming_status(self) -> Dict[str, Any]:
408
+ """Get current cache warming status."""
409
+ status = {
410
+ "last_warming": self._last_warming.get("all"),
411
+ "query_frequency_tracked": len(self._query_frequency),
412
+ "top_queries": sorted(
413
+ self._query_frequency.items(),
414
+ key=lambda x: x[1],
415
+ reverse=True
416
+ )[:10],
417
+ "config": {
418
+ "interval_seconds": self._warming_interval,
419
+ "ttls": self._config.TTL_CONFIG,
420
+ "limits": self._config.MAX_ITEMS_PER_TYPE
421
+ }
422
+ }
423
+
424
+ return status
425
+
426
+ async def trigger_manual_warming(
427
+ self,
428
+ strategies: Optional[List[CacheWarmingStrategy]] = None
429
+ ) -> Dict[str, Any]:
430
+ """Manually trigger cache warming."""
431
+ if strategies is None:
432
+ return await self.warm_all_caches()
433
+
434
+ results = {}
435
+ for strategy in strategies:
436
+ try:
437
+ if strategy == CacheWarmingStrategy.POPULAR_DATA:
438
+ results[strategy] = await self._warm_popular_data()
439
+ elif strategy == CacheWarmingStrategy.RECENT_INVESTIGATIONS:
440
+ results[strategy] = await self._warm_recent_investigations()
441
+ elif strategy == CacheWarmingStrategy.FREQUENT_QUERIES:
442
+ results[strategy] = await self._warm_frequent_queries()
443
+ elif strategy == CacheWarmingStrategy.AGENT_POOLS:
444
+ results[strategy] = await self._warm_agent_pools()
445
+ elif strategy == CacheWarmingStrategy.STATIC_RESOURCES:
446
+ results[strategy] = await self._warm_static_resources()
447
+ except Exception as e:
448
+ results[strategy] = {"error": str(e)}
449
+
450
+ return results
451
+
452
+
453
+ # Global instance
454
+ cache_warming_service = CacheWarmingService()
src/services/database_optimization_service.py ADDED
@@ -0,0 +1,576 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Module: services.database_optimization_service
3
+ Description: Database query optimization and index management
4
+ Author: Anderson H. Silva
5
+ Date: 2025-01-25
6
+ License: Proprietary - All rights reserved
7
+ """
8
+
9
+ from typing import List, Dict, Any, Optional, Tuple
10
+ from datetime import datetime, timedelta, timezone
11
+ from sqlalchemy import text, create_engine, inspect
12
+ from sqlalchemy.ext.asyncio import AsyncSession
13
+ import asyncio
14
+ import time
15
+
16
+ from src.core import get_logger
17
+ from src.db.session import get_session
18
+ from src.core.config import settings
19
+
20
+ logger = get_logger(__name__)
21
+
22
+
23
+ class QueryAnalysis:
24
+ """Analysis result for a database query."""
25
+
26
+ def __init__(self, query: str, execution_time: float, plan: Dict[str, Any]):
27
+ self.query = query
28
+ self.execution_time = execution_time
29
+ self.plan = plan
30
+ self.suggestions = []
31
+ self.estimated_improvement = 0.0
32
+
33
+ def add_suggestion(self, suggestion: str, improvement: float = 0.0):
34
+ """Add optimization suggestion."""
35
+ self.suggestions.append(suggestion)
36
+ self.estimated_improvement += improvement
37
+
38
+
39
+ class DatabaseOptimizationService:
40
+ """Service for database performance optimization."""
41
+
42
+ def __init__(self):
43
+ """Initialize database optimization service."""
44
+ self._slow_query_threshold = 1.0 # seconds
45
+ self._index_suggestions = {}
46
+ self._query_stats = {}
47
+
48
+ async def analyze_slow_queries(
49
+ self,
50
+ session: AsyncSession,
51
+ limit: int = 20
52
+ ) -> List[QueryAnalysis]:
53
+ """Analyze slow queries from PostgreSQL."""
54
+ analyses = []
55
+
56
+ try:
57
+ # Get slow queries from pg_stat_statements
58
+ slow_queries_sql = """
59
+ SELECT
60
+ query,
61
+ mean_exec_time / 1000.0 as mean_exec_seconds,
62
+ calls,
63
+ total_exec_time / 1000.0 as total_exec_seconds,
64
+ min_exec_time / 1000.0 as min_exec_seconds,
65
+ max_exec_time / 1000.0 as max_exec_seconds,
66
+ rows
67
+ FROM pg_stat_statements
68
+ WHERE mean_exec_time > :threshold_ms
69
+ AND query NOT LIKE '%pg_stat%'
70
+ AND query NOT LIKE '%information_schema%'
71
+ ORDER BY mean_exec_time DESC
72
+ LIMIT :limit
73
+ """
74
+
75
+ result = await session.execute(
76
+ text(slow_queries_sql),
77
+ {
78
+ "threshold_ms": self._slow_query_threshold * 1000,
79
+ "limit": limit
80
+ }
81
+ )
82
+
83
+ rows = result.fetchall()
84
+
85
+ for row in rows:
86
+ # Analyze each slow query
87
+ analysis = QueryAnalysis(
88
+ query=row.query,
89
+ execution_time=row.mean_exec_seconds,
90
+ plan={
91
+ "calls": row.calls,
92
+ "total_time": row.total_exec_seconds,
93
+ "min_time": row.min_exec_seconds,
94
+ "max_time": row.max_exec_seconds,
95
+ "rows": row.rows
96
+ }
97
+ )
98
+
99
+ # Get query plan
100
+ await self._analyze_query_plan(session, analysis)
101
+
102
+ # Generate suggestions
103
+ self._generate_suggestions(analysis)
104
+
105
+ analyses.append(analysis)
106
+
107
+ logger.info(
108
+ "slow_query_analysis_completed",
109
+ queries_analyzed=len(analyses)
110
+ )
111
+
112
+ except Exception as e:
113
+ logger.error(
114
+ "slow_query_analysis_error",
115
+ error=str(e),
116
+ exc_info=True
117
+ )
118
+
119
+ return analyses
120
+
121
+ async def _analyze_query_plan(
122
+ self,
123
+ session: AsyncSession,
124
+ analysis: QueryAnalysis
125
+ ):
126
+ """Analyze query execution plan."""
127
+ try:
128
+ # Get EXPLAIN ANALYZE for the query
129
+ explain_sql = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {analysis.query}"
130
+
131
+ result = await session.execute(text(explain_sql))
132
+ plan_data = result.scalar()
133
+
134
+ if plan_data:
135
+ analysis.plan["execution_plan"] = plan_data[0]["Plan"]
136
+
137
+ # Extract key metrics
138
+ plan = plan_data[0]["Plan"]
139
+ analysis.plan["total_cost"] = plan.get("Total Cost", 0)
140
+ analysis.plan["actual_time"] = plan.get("Actual Total Time", 0)
141
+
142
+ # Look for problematic patterns
143
+ self._check_plan_issues(plan, analysis)
144
+
145
+ except Exception as e:
146
+ logger.debug(f"Could not analyze plan for query: {e}")
147
+
148
+ def _check_plan_issues(self, plan: Dict[str, Any], analysis: QueryAnalysis):
149
+ """Check for common plan issues."""
150
+ # Sequential scan on large tables
151
+ if plan.get("Node Type") == "Seq Scan":
152
+ rows = plan.get("Actual Rows", 0)
153
+ if rows > 1000:
154
+ analysis.add_suggestion(
155
+ f"Sequential scan on {rows} rows. Consider adding an index.",
156
+ improvement=0.5
157
+ )
158
+
159
+ # Nested loops with high iterations
160
+ if plan.get("Node Type") == "Nested Loop":
161
+ loops = plan.get("Actual Loops", 0)
162
+ if loops > 100:
163
+ analysis.add_suggestion(
164
+ f"Nested loop with {loops} iterations. Consider query restructuring.",
165
+ improvement=0.3
166
+ )
167
+
168
+ # Check child nodes recursively
169
+ if "Plans" in plan:
170
+ for child_plan in plan["Plans"]:
171
+ self._check_plan_issues(child_plan, analysis)
172
+
173
+ def _generate_suggestions(self, analysis: QueryAnalysis):
174
+ """Generate optimization suggestions for a query."""
175
+ query_lower = analysis.query.lower()
176
+
177
+ # Check for missing LIMIT
178
+ if "select" in query_lower and "limit" not in query_lower:
179
+ if analysis.plan.get("rows", 0) > 1000:
180
+ analysis.add_suggestion(
181
+ "Query returns many rows. Consider adding LIMIT clause.",
182
+ improvement=0.2
183
+ )
184
+
185
+ # Check for SELECT *
186
+ if "select *" in query_lower:
187
+ analysis.add_suggestion(
188
+ "Avoid SELECT *. Specify only needed columns.",
189
+ improvement=0.1
190
+ )
191
+
192
+ # Check for missing WHERE on large tables
193
+ if "where" not in query_lower and analysis.plan.get("rows", 0) > 10000:
194
+ analysis.add_suggestion(
195
+ "No WHERE clause on large result set. Add filtering.",
196
+ improvement=0.4
197
+ )
198
+
199
+ # Check for IN with many values
200
+ import re
201
+ in_matches = re.findall(r'IN\s*\([^)]+\)', query_lower)
202
+ for match in in_matches:
203
+ values_count = match.count(',') + 1
204
+ if values_count > 10:
205
+ analysis.add_suggestion(
206
+ f"IN clause with {values_count} values. Consider using JOIN or temp table.",
207
+ improvement=0.2
208
+ )
209
+
210
+ async def create_missing_indexes(
211
+ self,
212
+ session: AsyncSession,
213
+ dry_run: bool = True
214
+ ) -> List[Dict[str, Any]]:
215
+ """Create missing indexes based on analysis."""
216
+ index_commands = []
217
+
218
+ try:
219
+ # Analyze foreign key columns without indexes
220
+ fk_index_sql = """
221
+ SELECT
222
+ tc.table_name,
223
+ kcu.column_name,
224
+ ccu.table_name AS foreign_table_name
225
+ FROM information_schema.table_constraints AS tc
226
+ JOIN information_schema.key_column_usage AS kcu
227
+ ON tc.constraint_name = kcu.constraint_name
228
+ JOIN information_schema.constraint_column_usage AS ccu
229
+ ON ccu.constraint_name = tc.constraint_name
230
+ WHERE tc.constraint_type = 'FOREIGN KEY'
231
+ AND NOT EXISTS (
232
+ SELECT 1
233
+ FROM pg_indexes
234
+ WHERE schemaname = 'public'
235
+ AND tablename = tc.table_name
236
+ AND indexdef LIKE '%' || kcu.column_name || '%'
237
+ )
238
+ """
239
+
240
+ result = await session.execute(text(fk_index_sql))
241
+ fk_without_index = result.fetchall()
242
+
243
+ for row in fk_without_index:
244
+ index_name = f"idx_{row.table_name}_{row.column_name}"
245
+ index_cmd = f"CREATE INDEX {index_name} ON {row.table_name} ({row.column_name})"
246
+
247
+ index_commands.append({
248
+ "type": "foreign_key",
249
+ "table": row.table_name,
250
+ "column": row.column_name,
251
+ "command": index_cmd,
252
+ "reason": f"Foreign key to {row.foreign_table_name}"
253
+ })
254
+
255
+ # Analyze frequently filtered columns
256
+ filter_columns = await self._analyze_filter_columns(session)
257
+
258
+ for table, column, frequency in filter_columns:
259
+ # Check if index already exists
260
+ check_sql = """
261
+ SELECT 1 FROM pg_indexes
262
+ WHERE schemaname = 'public'
263
+ AND tablename = :table
264
+ AND indexdef LIKE :pattern
265
+ """
266
+
267
+ exists = await session.execute(
268
+ text(check_sql),
269
+ {"table": table, "pattern": f"%{column}%"}
270
+ )
271
+
272
+ if not exists.scalar():
273
+ index_name = f"idx_{table}_{column}_filter"
274
+ index_cmd = f"CREATE INDEX {index_name} ON {table} ({column})"
275
+
276
+ index_commands.append({
277
+ "type": "frequent_filter",
278
+ "table": table,
279
+ "column": column,
280
+ "command": index_cmd,
281
+ "reason": f"Frequently used in WHERE clause ({frequency} times)"
282
+ })
283
+
284
+ # Execute or return commands
285
+ if not dry_run and index_commands:
286
+ for idx_info in index_commands:
287
+ try:
288
+ await session.execute(text(idx_info["command"]))
289
+ idx_info["status"] = "created"
290
+ logger.info(
291
+ "index_created",
292
+ table=idx_info["table"],
293
+ column=idx_info["column"]
294
+ )
295
+ except Exception as e:
296
+ idx_info["status"] = "failed"
297
+ idx_info["error"] = str(e)
298
+ logger.error(
299
+ "index_creation_failed",
300
+ table=idx_info["table"],
301
+ error=str(e)
302
+ )
303
+
304
+ await session.commit()
305
+
306
+ except Exception as e:
307
+ logger.error(
308
+ "create_indexes_error",
309
+ error=str(e),
310
+ exc_info=True
311
+ )
312
+
313
+ return index_commands
314
+
315
+ async def _analyze_filter_columns(
316
+ self,
317
+ session: AsyncSession
318
+ ) -> List[Tuple[str, str, int]]:
319
+ """Analyze frequently filtered columns from query patterns."""
320
+ filter_columns = []
321
+
322
+ try:
323
+ # Parse WHERE clauses from pg_stat_statements
324
+ filter_analysis_sql = """
325
+ SELECT
326
+ query,
327
+ calls
328
+ FROM pg_stat_statements
329
+ WHERE query LIKE '%WHERE%'
330
+ AND query NOT LIKE '%pg_stat%'
331
+ AND calls > 10
332
+ ORDER BY calls DESC
333
+ LIMIT 100
334
+ """
335
+
336
+ result = await session.execute(text(filter_analysis_sql))
337
+ queries = result.fetchall()
338
+
339
+ # Simple pattern matching for WHERE conditions
340
+ import re
341
+ column_frequency = {}
342
+
343
+ for query, calls in queries:
344
+ # Extract table.column or column patterns after WHERE
345
+ where_match = re.search(r'WHERE\s+(.+?)(?:ORDER|GROUP|LIMIT|$)', query, re.IGNORECASE)
346
+ if where_match:
347
+ conditions = where_match.group(1)
348
+
349
+ # Find column references
350
+ column_patterns = re.findall(r'(\w+)\.(\w+)\s*[=<>]|(\w+)\s*[=<>]', conditions)
351
+
352
+ for pattern in column_patterns:
353
+ if pattern[0] and pattern[1]: # table.column format
354
+ key = (pattern[0], pattern[1])
355
+ elif pattern[2]: # column only format
356
+ # Try to infer table from FROM clause
357
+ from_match = re.search(r'FROM\s+(\w+)', query, re.IGNORECASE)
358
+ if from_match:
359
+ key = (from_match.group(1), pattern[2])
360
+ else:
361
+ continue
362
+ else:
363
+ continue
364
+
365
+ column_frequency[key] = column_frequency.get(key, 0) + calls
366
+
367
+ # Sort by frequency
368
+ for (table, column), frequency in sorted(
369
+ column_frequency.items(),
370
+ key=lambda x: x[1],
371
+ reverse=True
372
+ )[:20]:
373
+ filter_columns.append((table, column, frequency))
374
+
375
+ except Exception as e:
376
+ logger.error(
377
+ "filter_column_analysis_error",
378
+ error=str(e),
379
+ exc_info=True
380
+ )
381
+
382
+ return filter_columns
383
+
384
+ async def optimize_table_statistics(
385
+ self,
386
+ session: AsyncSession,
387
+ tables: Optional[List[str]] = None
388
+ ) -> Dict[str, Any]:
389
+ """Update table statistics for query planner."""
390
+ results = {
391
+ "analyzed": [],
392
+ "vacuumed": [],
393
+ "errors": []
394
+ }
395
+
396
+ try:
397
+ # Get all tables if not specified
398
+ if not tables:
399
+ tables_sql = """
400
+ SELECT tablename
401
+ FROM pg_tables
402
+ WHERE schemaname = 'public'
403
+ """
404
+ result = await session.execute(text(tables_sql))
405
+ tables = [row[0] for row in result.fetchall()]
406
+
407
+ for table in tables:
408
+ try:
409
+ # ANALYZE table
410
+ await session.execute(text(f"ANALYZE {table}"))
411
+ results["analyzed"].append(table)
412
+
413
+ # Check if VACUUM needed
414
+ vacuum_check_sql = """
415
+ SELECT
416
+ n_dead_tup,
417
+ n_live_tup
418
+ FROM pg_stat_user_tables
419
+ WHERE relname = :table
420
+ """
421
+
422
+ result = await session.execute(
423
+ text(vacuum_check_sql),
424
+ {"table": table}
425
+ )
426
+ row = result.fetchone()
427
+
428
+ if row and row.n_dead_tup > row.n_live_tup * 0.2:
429
+ # More than 20% dead tuples, vacuum needed
430
+ await session.execute(text(f"VACUUM ANALYZE {table}"))
431
+ results["vacuumed"].append(table)
432
+ logger.info(
433
+ "table_vacuumed",
434
+ table=table,
435
+ dead_tuples=row.n_dead_tup
436
+ )
437
+
438
+ except Exception as e:
439
+ results["errors"].append({
440
+ "table": table,
441
+ "error": str(e)
442
+ })
443
+ logger.error(
444
+ f"Failed to optimize table {table}: {e}"
445
+ )
446
+
447
+ await session.commit()
448
+
449
+ except Exception as e:
450
+ logger.error(
451
+ "table_optimization_error",
452
+ error=str(e),
453
+ exc_info=True
454
+ )
455
+
456
+ return results
457
+
458
+ async def get_database_stats(
459
+ self,
460
+ session: AsyncSession
461
+ ) -> Dict[str, Any]:
462
+ """Get comprehensive database statistics."""
463
+ stats = {}
464
+
465
+ try:
466
+ # Database size
467
+ size_sql = """
468
+ SELECT
469
+ pg_database_size(current_database()) as db_size,
470
+ pg_size_pretty(pg_database_size(current_database())) as db_size_pretty
471
+ """
472
+ result = await session.execute(text(size_sql))
473
+ size_info = result.fetchone()
474
+ stats["database_size"] = {
475
+ "bytes": size_info.db_size,
476
+ "pretty": size_info.db_size_pretty
477
+ }
478
+
479
+ # Table sizes
480
+ table_sizes_sql = """
481
+ SELECT
482
+ schemaname,
483
+ tablename,
484
+ pg_total_relation_size(schemaname||'.'||tablename) as total_size,
485
+ pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size_pretty,
486
+ n_live_tup as row_count
487
+ FROM pg_tables
488
+ JOIN pg_stat_user_tables USING (schemaname, tablename)
489
+ WHERE schemaname = 'public'
490
+ ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
491
+ LIMIT 10
492
+ """
493
+ result = await session.execute(text(table_sizes_sql))
494
+ stats["largest_tables"] = [
495
+ {
496
+ "table": row.tablename,
497
+ "size_bytes": row.total_size,
498
+ "size_pretty": row.size_pretty,
499
+ "row_count": row.row_count
500
+ }
501
+ for row in result.fetchall()
502
+ ]
503
+
504
+ # Index usage
505
+ index_usage_sql = """
506
+ SELECT
507
+ schemaname,
508
+ tablename,
509
+ indexname,
510
+ idx_scan,
511
+ idx_tup_read,
512
+ idx_tup_fetch,
513
+ pg_size_pretty(pg_relation_size(indexrelid)) as index_size
514
+ FROM pg_stat_user_indexes
515
+ WHERE schemaname = 'public'
516
+ ORDER BY idx_scan
517
+ LIMIT 20
518
+ """
519
+ result = await session.execute(text(index_usage_sql))
520
+ stats["least_used_indexes"] = [
521
+ {
522
+ "table": row.tablename,
523
+ "index": row.indexname,
524
+ "scans": row.idx_scan,
525
+ "size": row.index_size
526
+ }
527
+ for row in result.fetchall()
528
+ ]
529
+
530
+ # Cache hit ratio
531
+ cache_sql = """
532
+ SELECT
533
+ sum(heap_blks_read) as heap_read,
534
+ sum(heap_blks_hit) as heap_hit,
535
+ sum(heap_blks_hit) / NULLIF(sum(heap_blks_hit) + sum(heap_blks_read), 0) as cache_hit_ratio
536
+ FROM pg_statio_user_tables
537
+ """
538
+ result = await session.execute(text(cache_sql))
539
+ cache_info = result.fetchone()
540
+ stats["cache_hit_ratio"] = {
541
+ "ratio": float(cache_info.cache_hit_ratio or 0),
542
+ "heap_read": cache_info.heap_read,
543
+ "heap_hit": cache_info.heap_hit
544
+ }
545
+
546
+ # Connection stats
547
+ conn_sql = """
548
+ SELECT
549
+ count(*) as total_connections,
550
+ count(*) FILTER (WHERE state = 'active') as active_connections,
551
+ count(*) FILTER (WHERE state = 'idle') as idle_connections,
552
+ count(*) FILTER (WHERE state = 'idle in transaction') as idle_in_transaction
553
+ FROM pg_stat_activity
554
+ WHERE datname = current_database()
555
+ """
556
+ result = await session.execute(text(conn_sql))
557
+ conn_info = result.fetchone()
558
+ stats["connections"] = {
559
+ "total": conn_info.total_connections,
560
+ "active": conn_info.active_connections,
561
+ "idle": conn_info.idle_connections,
562
+ "idle_in_transaction": conn_info.idle_in_transaction
563
+ }
564
+
565
+ except Exception as e:
566
+ logger.error(
567
+ "database_stats_error",
568
+ error=str(e),
569
+ exc_info=True
570
+ )
571
+
572
+ return stats
573
+
574
+
575
+ # Global instance
576
+ database_optimization_service = DatabaseOptimizationService()
src/tasks/cache_warming_tasks.py ADDED
@@ -0,0 +1,328 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Module: tasks.cache_warming_tasks
3
+ Description: Celery tasks for cache warming
4
+ Author: Anderson H. Silva
5
+ Date: 2025-01-25
6
+ License: Proprietary - All rights reserved
7
+ """
8
+
9
+ from datetime import datetime, timezone
10
+ from typing import Dict, Any, List, Optional
11
+
12
+ from celery import shared_task
13
+ from celery.utils.log import get_task_logger
14
+
15
+ from src.services.cache_warming_service import (
16
+ cache_warming_service,
17
+ CacheWarmingStrategy
18
+ )
19
+
20
+ logger = get_task_logger(__name__)
21
+
22
+
23
+ @shared_task(
24
+ name="cache_warming.warm_all",
25
+ max_retries=3,
26
+ default_retry_delay=300,
27
+ time_limit=600,
28
+ soft_time_limit=540
29
+ )
30
+ def warm_all_caches() -> Dict[str, Any]:
31
+ """
32
+ Warm all caches using all strategies.
33
+
34
+ This task is scheduled to run periodically.
35
+ """
36
+ try:
37
+ logger.info("Starting scheduled cache warming")
38
+
39
+ # Execute warming synchronously
40
+ import asyncio
41
+ loop = asyncio.new_event_loop()
42
+ asyncio.set_event_loop(loop)
43
+
44
+ try:
45
+ result = loop.run_until_complete(
46
+ cache_warming_service.warm_all_caches()
47
+ )
48
+
49
+ logger.info(
50
+ "Cache warming completed",
51
+ result=result
52
+ )
53
+
54
+ return {
55
+ "status": "completed",
56
+ "timestamp": datetime.now(timezone.utc).isoformat(),
57
+ "result": result
58
+ }
59
+
60
+ finally:
61
+ loop.close()
62
+
63
+ except Exception as e:
64
+ logger.error(
65
+ f"Cache warming failed: {str(e)}",
66
+ exc_info=True
67
+ )
68
+ raise
69
+
70
+
71
+ @shared_task(
72
+ name="cache_warming.warm_strategy",
73
+ max_retries=3,
74
+ default_retry_delay=60
75
+ )
76
+ def warm_specific_strategy(strategy: str) -> Dict[str, Any]:
77
+ """
78
+ Warm cache using a specific strategy.
79
+
80
+ Args:
81
+ strategy: Name of the warming strategy
82
+ """
83
+ try:
84
+ logger.info(f"Starting cache warming for strategy: {strategy}")
85
+
86
+ # Convert string to enum
87
+ strategy_enum = CacheWarmingStrategy(strategy)
88
+
89
+ # Execute warming
90
+ import asyncio
91
+ loop = asyncio.new_event_loop()
92
+ asyncio.set_event_loop(loop)
93
+
94
+ try:
95
+ result = loop.run_until_complete(
96
+ cache_warming_service.trigger_manual_warming([strategy_enum])
97
+ )
98
+
99
+ return {
100
+ "status": "completed",
101
+ "strategy": strategy,
102
+ "timestamp": datetime.now(timezone.utc).isoformat(),
103
+ "result": result
104
+ }
105
+
106
+ finally:
107
+ loop.close()
108
+
109
+ except Exception as e:
110
+ logger.error(
111
+ f"Strategy warming failed: {str(e)}",
112
+ exc_info=True
113
+ )
114
+ raise
115
+
116
+
117
+ @shared_task(
118
+ name="cache_warming.warm_contracts",
119
+ max_retries=2,
120
+ default_retry_delay=120
121
+ )
122
+ def warm_contract_cache(contract_ids: List[str]) -> Dict[str, Any]:
123
+ """
124
+ Warm cache for specific contracts.
125
+
126
+ Args:
127
+ contract_ids: List of contract IDs to cache
128
+ """
129
+ try:
130
+ logger.info(
131
+ f"Warming cache for {len(contract_ids)} contracts"
132
+ )
133
+
134
+ import asyncio
135
+ loop = asyncio.new_event_loop()
136
+ asyncio.set_event_loop(loop)
137
+
138
+ try:
139
+ result = loop.run_until_complete(
140
+ cache_warming_service.warm_specific_data(
141
+ data_type="contract",
142
+ identifiers=contract_ids,
143
+ ttl=3600 # 1 hour
144
+ )
145
+ )
146
+
147
+ logger.info(
148
+ f"Contract cache warming completed: "
149
+ f"{len(result['warmed'])} warmed, "
150
+ f"{len(result['failed'])} failed"
151
+ )
152
+
153
+ return result
154
+
155
+ finally:
156
+ loop.close()
157
+
158
+ except Exception as e:
159
+ logger.error(
160
+ f"Contract cache warming failed: {str(e)}",
161
+ exc_info=True
162
+ )
163
+ raise
164
+
165
+
166
+ @shared_task(
167
+ name="cache_warming.warm_investigations",
168
+ max_retries=2,
169
+ default_retry_delay=120
170
+ )
171
+ def warm_investigation_cache(
172
+ investigation_ids: Optional[List[str]] = None,
173
+ limit: int = 50
174
+ ) -> Dict[str, Any]:
175
+ """
176
+ Warm cache for investigations.
177
+
178
+ Args:
179
+ investigation_ids: Specific IDs or None for recent
180
+ limit: Maximum number to warm if no IDs provided
181
+ """
182
+ try:
183
+ import asyncio
184
+ loop = asyncio.new_event_loop()
185
+ asyncio.set_event_loop(loop)
186
+
187
+ try:
188
+ if investigation_ids:
189
+ # Warm specific investigations
190
+ result = loop.run_until_complete(
191
+ cache_warming_service.warm_specific_data(
192
+ data_type="investigation",
193
+ identifiers=investigation_ids,
194
+ ttl=1800 # 30 minutes
195
+ )
196
+ )
197
+ else:
198
+ # Warm recent investigations
199
+ result = loop.run_until_complete(
200
+ cache_warming_service.trigger_manual_warming(
201
+ [CacheWarmingStrategy.RECENT_INVESTIGATIONS]
202
+ )
203
+ )
204
+
205
+ return {
206
+ "status": "completed",
207
+ "timestamp": datetime.now(timezone.utc).isoformat(),
208
+ "result": result
209
+ }
210
+
211
+ finally:
212
+ loop.close()
213
+
214
+ except Exception as e:
215
+ logger.error(
216
+ f"Investigation cache warming failed: {str(e)}",
217
+ exc_info=True
218
+ )
219
+ raise
220
+
221
+
222
+ @shared_task(
223
+ name="cache_warming.analyze_patterns",
224
+ max_retries=1,
225
+ time_limit=300
226
+ )
227
+ def analyze_cache_patterns() -> Dict[str, Any]:
228
+ """
229
+ Analyze cache access patterns for optimization.
230
+
231
+ This task collects metrics about cache usage to improve
232
+ warming strategies.
233
+ """
234
+ try:
235
+ logger.info("Analyzing cache access patterns")
236
+
237
+ import asyncio
238
+ from src.infrastructure.cache import cache_service
239
+
240
+ loop = asyncio.new_event_loop()
241
+ asyncio.set_event_loop(loop)
242
+
243
+ try:
244
+ # Get cache statistics
245
+ stats = loop.run_until_complete(
246
+ cache_service.get_stats()
247
+ )
248
+
249
+ # Get warming status
250
+ warming_status = loop.run_until_complete(
251
+ cache_warming_service.get_warming_status()
252
+ )
253
+
254
+ # Analyze patterns
255
+ analysis = {
256
+ "cache_stats": stats,
257
+ "warming_status": warming_status,
258
+ "recommendations": []
259
+ }
260
+
261
+ # Generate recommendations
262
+ if stats.get("hit_rate", 0) < 0.7:
263
+ analysis["recommendations"].append(
264
+ "Low hit rate detected. Consider warming more frequently accessed data."
265
+ )
266
+
267
+ if warming_status["query_frequency_tracked"] > 500:
268
+ analysis["recommendations"].append(
269
+ "High query diversity. Consider implementing predictive warming."
270
+ )
271
+
272
+ logger.info(
273
+ "Cache pattern analysis completed",
274
+ recommendations=len(analysis["recommendations"])
275
+ )
276
+
277
+ return analysis
278
+
279
+ finally:
280
+ loop.close()
281
+
282
+ except Exception as e:
283
+ logger.error(
284
+ f"Cache pattern analysis failed: {str(e)}",
285
+ exc_info=True
286
+ )
287
+ raise
288
+
289
+
290
+ # Celery Beat schedule configuration
291
+ from celery.schedules import crontab
292
+
293
+ beat_schedule = {
294
+ 'warm-all-caches': {
295
+ 'task': 'cache_warming.warm_all',
296
+ 'schedule': crontab(minute='*/10'), # Every 10 minutes
297
+ 'options': {
298
+ 'queue': 'cache',
299
+ 'priority': 3
300
+ }
301
+ },
302
+ 'warm-popular-data': {
303
+ 'task': 'cache_warming.warm_strategy',
304
+ 'schedule': crontab(minute='*/5'), # Every 5 minutes
305
+ 'args': ['popular_data'],
306
+ 'options': {
307
+ 'queue': 'cache',
308
+ 'priority': 5
309
+ }
310
+ },
311
+ 'warm-static-resources': {
312
+ 'task': 'cache_warming.warm_strategy',
313
+ 'schedule': crontab(hour='*/6'), # Every 6 hours
314
+ 'args': ['static_resources'],
315
+ 'options': {
316
+ 'queue': 'cache',
317
+ 'priority': 2
318
+ }
319
+ },
320
+ 'analyze-cache-patterns': {
321
+ 'task': 'cache_warming.analyze_patterns',
322
+ 'schedule': crontab(hour=2, minute=0), # Daily at 2 AM
323
+ 'options': {
324
+ 'queue': 'analytics',
325
+ 'priority': 1
326
+ }
327
+ }
328
+ }