anderson-ufrj commited on
Commit
616f37f
·
1 Parent(s): c76ac1d

feat(investigations): persist all investigation data to Supabase

Browse files

Critical enhancement: investigations are now saved to database instead of
just in-memory storage.

Changes:
- Create investigation in database when started (via REST API on HuggingFace)
- Update progress during execution (data_retrieval, anomaly_detection, analysis)
- Save final results including anomalies, summary, and confidence scores
- Persist failures with error messages for debugging
- Graceful fallback to in-memory if database unavailable
- Backward compatible with existing in-memory cache for fast access

This ensures all investigation data persists across app restarts and
is accessible via Supabase dashboard for monitoring and analysis.

Files changed (1) hide show
  1. src/api/routes/investigations.py +105 -16
src/api/routes/investigations.py CHANGED
@@ -20,6 +20,7 @@ from src.agents import InvestigatorAgent, AgentContext
20
  from src.api.middleware.authentication import get_current_user
21
  from src.tools import TransparencyAPIFilter
22
  from src.infrastructure.observability.metrics import track_time, count_calls, BusinessMetrics
 
23
 
24
 
25
  logger = get_logger(__name__)
@@ -115,13 +116,39 @@ async def start_investigation(
115
  ):
116
  """
117
  Start a new investigation for anomaly detection.
118
-
119
  Creates and queues an investigation task that will analyze government data
120
  for irregularities and suspicious patterns.
121
  """
122
- investigation_id = str(uuid4())
123
-
124
- # Store investigation metadata
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
  _active_investigations[investigation_id] = {
126
  "id": investigation_id,
127
  "status": "started",
@@ -137,14 +164,14 @@ async def start_investigation(
137
  "anomalies_detected": 0,
138
  "results": [],
139
  }
140
-
141
  # Start investigation in background
142
  background_tasks.add_task(
143
  _run_investigation,
144
  investigation_id,
145
  request
146
  )
147
-
148
  logger.info(
149
  "investigation_started",
150
  investigation_id=investigation_id,
@@ -152,14 +179,14 @@ async def start_investigation(
152
  data_source=request.data_source,
153
  user_id=current_user.get("user_id"),
154
  )
155
-
156
  # Track business metrics
157
  BusinessMetrics.record_investigation_created(
158
  priority="medium",
159
  user_type="authenticated"
160
  )
161
  BusinessMetrics.update_active_investigations(len(_active_investigations))
162
-
163
  return {
164
  "investigation_id": investigation_id,
165
  "status": "started",
@@ -411,22 +438,44 @@ async def _run_investigation(investigation_id: str, request: InvestigationReques
411
  investigation["status"] = "running"
412
  investigation["current_phase"] = "data_retrieval"
413
  investigation["progress"] = 0.1
414
-
 
 
 
 
 
 
 
 
 
 
 
415
  # Create agent context
416
  context = AgentContext(
417
  conversation_id=investigation_id,
418
  user_id=investigation["user_id"],
419
  session_data={"investigation_query": request.query}
420
  )
421
-
422
  # Initialize InvestigatorAgent
423
  investigator = InvestigatorAgent()
424
-
425
  # Prepare filters for data retrieval
426
  filters = TransparencyAPIFilter(**request.filters)
427
-
428
  investigation["current_phase"] = "anomaly_detection"
429
  investigation["progress"] = 0.3
 
 
 
 
 
 
 
 
 
 
 
430
 
431
  # Execute investigation
432
  results = await investigator.investigate_anomalies(
@@ -472,10 +521,34 @@ async def _run_investigation(investigation_id: str, request: InvestigationReques
472
  investigation["completed_at"] = datetime.utcnow()
473
  investigation["progress"] = 1.0
474
  investigation["current_phase"] = "completed"
475
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
476
  # Calculate duration
477
  duration = (datetime.utcnow() - start_time).total_seconds()
478
-
479
  logger.info(
480
  "investigation_completed",
481
  investigation_id=investigation_id,
@@ -506,8 +579,24 @@ async def _run_investigation(investigation_id: str, request: InvestigationReques
506
  investigation_id=investigation_id,
507
  error=str(e),
508
  )
509
-
510
  investigation["status"] = "failed"
511
  investigation["completed_at"] = datetime.utcnow()
512
  investigation["current_phase"] = "failed"
513
- investigation["error"] = str(e)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
  from src.api.middleware.authentication import get_current_user
21
  from src.tools import TransparencyAPIFilter
22
  from src.infrastructure.observability.metrics import track_time, count_calls, BusinessMetrics
23
+ from src.services.investigation_service_selector import investigation_service
24
 
25
 
26
  logger = get_logger(__name__)
 
116
  ):
117
  """
118
  Start a new investigation for anomaly detection.
119
+
120
  Creates and queues an investigation task that will analyze government data
121
  for irregularities and suspicious patterns.
122
  """
123
+ try:
124
+ # Create investigation in database (Supabase via REST API on HuggingFace)
125
+ db_investigation = await investigation_service.create(
126
+ user_id=current_user.get("user_id"),
127
+ query=request.query,
128
+ data_source=request.data_source,
129
+ filters=request.filters,
130
+ anomaly_types=request.anomaly_types
131
+ )
132
+
133
+ investigation_id = db_investigation.id if hasattr(db_investigation, 'id') else db_investigation['id']
134
+
135
+ logger.info(
136
+ "investigation_created_in_database",
137
+ investigation_id=investigation_id,
138
+ query=request.query,
139
+ data_source=request.data_source,
140
+ user_id=current_user.get("user_id"),
141
+ )
142
+
143
+ except Exception as e:
144
+ # Fallback to in-memory if database fails
145
+ logger.warning(
146
+ "Failed to save investigation to database, using in-memory fallback",
147
+ error=str(e)
148
+ )
149
+ investigation_id = str(uuid4())
150
+
151
+ # Keep in-memory copy for backward compatibility and fast access
152
  _active_investigations[investigation_id] = {
153
  "id": investigation_id,
154
  "status": "started",
 
164
  "anomalies_detected": 0,
165
  "results": [],
166
  }
167
+
168
  # Start investigation in background
169
  background_tasks.add_task(
170
  _run_investigation,
171
  investigation_id,
172
  request
173
  )
174
+
175
  logger.info(
176
  "investigation_started",
177
  investigation_id=investigation_id,
 
179
  data_source=request.data_source,
180
  user_id=current_user.get("user_id"),
181
  )
182
+
183
  # Track business metrics
184
  BusinessMetrics.record_investigation_created(
185
  priority="medium",
186
  user_type="authenticated"
187
  )
188
  BusinessMetrics.update_active_investigations(len(_active_investigations))
189
+
190
  return {
191
  "investigation_id": investigation_id,
192
  "status": "started",
 
438
  investigation["status"] = "running"
439
  investigation["current_phase"] = "data_retrieval"
440
  investigation["progress"] = 0.1
441
+
442
+ # Update in database
443
+ try:
444
+ await investigation_service.update_status(
445
+ investigation_id=investigation_id,
446
+ status="running",
447
+ progress=0.1,
448
+ current_phase="data_retrieval"
449
+ )
450
+ except Exception as e:
451
+ logger.warning(f"Failed to update investigation status in database: {e}")
452
+
453
  # Create agent context
454
  context = AgentContext(
455
  conversation_id=investigation_id,
456
  user_id=investigation["user_id"],
457
  session_data={"investigation_query": request.query}
458
  )
459
+
460
  # Initialize InvestigatorAgent
461
  investigator = InvestigatorAgent()
462
+
463
  # Prepare filters for data retrieval
464
  filters = TransparencyAPIFilter(**request.filters)
465
+
466
  investigation["current_phase"] = "anomaly_detection"
467
  investigation["progress"] = 0.3
468
+
469
+ # Update progress in database
470
+ try:
471
+ await investigation_service.update_status(
472
+ investigation_id=investigation_id,
473
+ status="running",
474
+ progress=0.3,
475
+ current_phase="anomaly_detection"
476
+ )
477
+ except Exception as e:
478
+ logger.warning(f"Failed to update investigation progress in database: {e}")
479
 
480
  # Execute investigation
481
  results = await investigator.investigate_anomalies(
 
521
  investigation["completed_at"] = datetime.utcnow()
522
  investigation["progress"] = 1.0
523
  investigation["current_phase"] = "completed"
524
+
525
+ # Save final results to database
526
+ try:
527
+ await investigation_service.update_status(
528
+ investigation_id=investigation_id,
529
+ status="completed",
530
+ progress=1.0,
531
+ current_phase="completed",
532
+ total_records_analyzed=investigation["records_processed"],
533
+ anomalies_found=investigation["anomalies_detected"],
534
+ summary=summary,
535
+ confidence_score=investigation["confidence_score"],
536
+ results=investigation["results"]
537
+ )
538
+ logger.info(
539
+ "investigation_saved_to_database",
540
+ investigation_id=investigation_id
541
+ )
542
+ except Exception as e:
543
+ logger.error(
544
+ "Failed to save investigation results to database",
545
+ investigation_id=investigation_id,
546
+ error=str(e)
547
+ )
548
+
549
  # Calculate duration
550
  duration = (datetime.utcnow() - start_time).total_seconds()
551
+
552
  logger.info(
553
  "investigation_completed",
554
  investigation_id=investigation_id,
 
579
  investigation_id=investigation_id,
580
  error=str(e),
581
  )
582
+
583
  investigation["status"] = "failed"
584
  investigation["completed_at"] = datetime.utcnow()
585
  investigation["current_phase"] = "failed"
586
+ investigation["error"] = str(e)
587
+
588
+ # Save failure to database
589
+ try:
590
+ await investigation_service.update_status(
591
+ investigation_id=investigation_id,
592
+ status="failed",
593
+ progress=investigation.get("progress", 0.0),
594
+ current_phase="failed",
595
+ error=str(e)
596
+ )
597
+ except Exception as db_error:
598
+ logger.error(
599
+ "Failed to save investigation failure to database",
600
+ investigation_id=investigation_id,
601
+ error=str(db_error)
602
+ )