anderson-ufrj commited on
Commit
36a83c3
·
1 Parent(s): b77bffb

feat(database): implement Supabase PostgreSQL service integration

Browse files

Add SupabaseService class with async connection pooling via asyncpg.
Provides CRUD operations for investigations table with JSONB support.
Includes health check, connection management, and proper error handling.

Technical details:
- Async connection pool (min: 5, max: 20 connections)
- JSONB field serialization for filters, results, anomaly_types
- PostgreSQL 17.6 compatible
- Row Level Security (RLS) support

Files changed (1) hide show
  1. src/services/supabase_service.py +456 -0
src/services/supabase_service.py ADDED
@@ -0,0 +1,456 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Supabase integration service for direct database access.
3
+
4
+ This service provides a bridge between the backend and Supabase PostgreSQL,
5
+ allowing investigations to be stored centrally for frontend consumption.
6
+ """
7
+
8
+ import os
9
+ from typing import Optional, List, Dict, Any
10
+ from datetime import datetime
11
+ from contextlib import asynccontextmanager
12
+
13
+ from asyncpg import Pool, create_pool, Connection
14
+ from pydantic import BaseModel, Field
15
+
16
+ from src.core import get_logger, settings
17
+ from src.core.exceptions import CidadaoAIError
18
+
19
+ logger = get_logger(__name__)
20
+
21
+
22
+ class SupabaseConfig(BaseModel):
23
+ """Supabase connection configuration."""
24
+
25
+ url: str = Field(..., description="Supabase PostgreSQL connection URL")
26
+ anon_key: Optional[str] = Field(None, description="Supabase anon key (for Row Level Security)")
27
+ service_role_key: Optional[str] = Field(None, description="Supabase service role key (bypasses RLS)")
28
+ min_connections: int = Field(default=5, description="Minimum pool connections")
29
+ max_connections: int = Field(default=20, description="Maximum pool connections")
30
+
31
+ @classmethod
32
+ def from_env(cls) -> "SupabaseConfig":
33
+ """Load configuration from environment variables."""
34
+ supabase_url = os.getenv("SUPABASE_DB_URL") or os.getenv("DATABASE_URL")
35
+
36
+ if not supabase_url:
37
+ raise ValueError(
38
+ "SUPABASE_DB_URL or DATABASE_URL environment variable required. "
39
+ "Get it from: Supabase Dashboard > Settings > Database > Connection string (URI)"
40
+ )
41
+
42
+ return cls(
43
+ url=supabase_url,
44
+ anon_key=os.getenv("SUPABASE_ANON_KEY"),
45
+ service_role_key=os.getenv("SUPABASE_SERVICE_ROLE_KEY"),
46
+ min_connections=int(os.getenv("SUPABASE_MIN_CONNECTIONS", "5")),
47
+ max_connections=int(os.getenv("SUPABASE_MAX_CONNECTIONS", "20")),
48
+ )
49
+
50
+
51
+ class SupabaseService:
52
+ """
53
+ Service for interacting with Supabase PostgreSQL.
54
+
55
+ Provides connection pooling and CRUD operations for investigations.
56
+ """
57
+
58
+ def __init__(self, config: Optional[SupabaseConfig] = None):
59
+ """
60
+ Initialize Supabase service.
61
+
62
+ Args:
63
+ config: Supabase configuration (loads from env if None)
64
+ """
65
+ self.config = config or SupabaseConfig.from_env()
66
+ self._pool: Optional[Pool] = None
67
+ self._initialized = False
68
+
69
+ async def initialize(self) -> None:
70
+ """Initialize connection pool."""
71
+ if self._initialized:
72
+ logger.warning("Supabase service already initialized")
73
+ return
74
+
75
+ try:
76
+ logger.info("Initializing Supabase connection pool")
77
+
78
+ self._pool = await create_pool(
79
+ dsn=self.config.url,
80
+ min_size=self.config.min_connections,
81
+ max_size=self.config.max_connections,
82
+ command_timeout=30,
83
+ server_settings={
84
+ 'application_name': 'cidadao-ai-backend',
85
+ 'timezone': 'UTC',
86
+ }
87
+ )
88
+
89
+ # Test connection
90
+ async with self._pool.acquire() as conn:
91
+ version = await conn.fetchval("SELECT version()")
92
+ logger.info(f"Connected to Supabase PostgreSQL: {version[:50]}...")
93
+
94
+ self._initialized = True
95
+ logger.info("Supabase service initialized successfully")
96
+
97
+ except Exception as e:
98
+ logger.error(f"Failed to initialize Supabase service: {e}", exc_info=True)
99
+ raise CidadaoAIError(f"Supabase initialization failed: {e}")
100
+
101
+ async def close(self) -> None:
102
+ """Close connection pool."""
103
+ if self._pool:
104
+ await self._pool.close()
105
+ self._initialized = False
106
+ logger.info("Supabase connection pool closed")
107
+
108
+ @asynccontextmanager
109
+ async def get_connection(self):
110
+ """
111
+ Get a database connection from the pool.
112
+
113
+ Yields:
114
+ Connection instance
115
+ """
116
+ if not self._initialized:
117
+ await self.initialize()
118
+
119
+ async with self._pool.acquire() as conn:
120
+ yield conn
121
+
122
+ async def create_investigation(
123
+ self,
124
+ user_id: str,
125
+ query: str,
126
+ data_source: str,
127
+ filters: Optional[Dict[str, Any]] = None,
128
+ anomaly_types: Optional[List[str]] = None,
129
+ session_id: Optional[str] = None,
130
+ ) -> Dict[str, Any]:
131
+ """
132
+ Create a new investigation in Supabase.
133
+
134
+ Args:
135
+ user_id: User ID
136
+ query: Investigation query
137
+ data_source: Data source to investigate
138
+ filters: Query filters
139
+ anomaly_types: Types of anomalies to detect
140
+ session_id: Optional session ID
141
+
142
+ Returns:
143
+ Created investigation as dict
144
+ """
145
+ async with self.get_connection() as conn:
146
+ import json
147
+
148
+ row = await conn.fetchrow(
149
+ """
150
+ INSERT INTO investigations (
151
+ user_id, session_id, query, data_source,
152
+ status, filters, anomaly_types, progress,
153
+ created_at, updated_at
154
+ )
155
+ VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7::jsonb, $8, $9, $10)
156
+ RETURNING *
157
+ """,
158
+ user_id,
159
+ session_id,
160
+ query,
161
+ data_source,
162
+ "pending",
163
+ json.dumps(filters or {}),
164
+ json.dumps(anomaly_types or []),
165
+ 0.0,
166
+ datetime.utcnow(),
167
+ datetime.utcnow(),
168
+ )
169
+
170
+ logger.info(f"Created investigation {row['id']} in Supabase")
171
+ return dict(row)
172
+
173
+ async def get_investigation(self, investigation_id: str) -> Optional[Dict[str, Any]]:
174
+ """
175
+ Get investigation by ID.
176
+
177
+ Args:
178
+ investigation_id: Investigation UUID
179
+
180
+ Returns:
181
+ Investigation dict or None
182
+ """
183
+ async with self.get_connection() as conn:
184
+ row = await conn.fetchrow(
185
+ "SELECT * FROM investigations WHERE id = $1",
186
+ investigation_id
187
+ )
188
+
189
+ return dict(row) if row else None
190
+
191
+ async def update_investigation(
192
+ self,
193
+ investigation_id: str,
194
+ **updates
195
+ ) -> Dict[str, Any]:
196
+ """
197
+ Update investigation fields.
198
+
199
+ Args:
200
+ investigation_id: Investigation UUID
201
+ **updates: Fields to update
202
+
203
+ Returns:
204
+ Updated investigation dict
205
+ """
206
+ import json
207
+
208
+ # JSONB fields that need special handling
209
+ jsonb_fields = {'results', 'filters', 'anomaly_types'}
210
+
211
+ # Build dynamic UPDATE query
212
+ set_clauses = []
213
+ values = []
214
+ param_index = 1
215
+
216
+ for key, value in updates.items():
217
+ if key in jsonb_fields and isinstance(value, (dict, list)):
218
+ set_clauses.append(f"{key} = ${param_index}::jsonb")
219
+ values.append(json.dumps(value))
220
+ else:
221
+ set_clauses.append(f"{key} = ${param_index}")
222
+ values.append(value)
223
+ param_index += 1
224
+
225
+ # Always update updated_at
226
+ set_clauses.append(f"updated_at = ${param_index}")
227
+ values.append(datetime.utcnow())
228
+ param_index += 1
229
+
230
+ # Add investigation_id as last parameter
231
+ values.append(investigation_id)
232
+
233
+ query = f"""
234
+ UPDATE investigations
235
+ SET {', '.join(set_clauses)}
236
+ WHERE id = ${param_index}
237
+ RETURNING *
238
+ """
239
+
240
+ async with self.get_connection() as conn:
241
+ row = await conn.fetchrow(query, *values)
242
+
243
+ if not row:
244
+ raise ValueError(f"Investigation {investigation_id} not found")
245
+
246
+ logger.debug(f"Updated investigation {investigation_id}")
247
+ return dict(row)
248
+
249
+ async def update_progress(
250
+ self,
251
+ investigation_id: str,
252
+ progress: float,
253
+ current_phase: str,
254
+ records_processed: Optional[int] = None,
255
+ anomalies_found: Optional[int] = None,
256
+ ) -> Dict[str, Any]:
257
+ """
258
+ Update investigation progress.
259
+
260
+ Args:
261
+ investigation_id: Investigation UUID
262
+ progress: Progress percentage (0.0 to 1.0)
263
+ current_phase: Current processing phase
264
+ records_processed: Number of records processed
265
+ anomalies_found: Number of anomalies detected
266
+
267
+ Returns:
268
+ Updated investigation dict
269
+ """
270
+ updates = {
271
+ "progress": progress,
272
+ "current_phase": current_phase,
273
+ }
274
+
275
+ if records_processed is not None:
276
+ updates["total_records_analyzed"] = records_processed
277
+
278
+ if anomalies_found is not None:
279
+ updates["anomalies_found"] = anomalies_found
280
+
281
+ return await self.update_investigation(investigation_id, **updates)
282
+
283
+ async def complete_investigation(
284
+ self,
285
+ investigation_id: str,
286
+ results: List[Dict[str, Any]],
287
+ summary: str,
288
+ confidence_score: float,
289
+ total_records: int,
290
+ anomalies_found: int,
291
+ ) -> Dict[str, Any]:
292
+ """
293
+ Mark investigation as completed with results.
294
+
295
+ Args:
296
+ investigation_id: Investigation UUID
297
+ results: List of anomaly results
298
+ summary: Investigation summary
299
+ confidence_score: Overall confidence
300
+ total_records: Total records analyzed
301
+ anomalies_found: Total anomalies found
302
+
303
+ Returns:
304
+ Updated investigation dict
305
+ """
306
+ return await self.update_investigation(
307
+ investigation_id,
308
+ status="completed",
309
+ progress=1.0,
310
+ current_phase="completed",
311
+ results=results,
312
+ summary=summary,
313
+ confidence_score=confidence_score,
314
+ total_records_analyzed=total_records,
315
+ anomalies_found=anomalies_found,
316
+ completed_at=datetime.utcnow(),
317
+ )
318
+
319
+ async def fail_investigation(
320
+ self,
321
+ investigation_id: str,
322
+ error_message: str,
323
+ ) -> Dict[str, Any]:
324
+ """
325
+ Mark investigation as failed.
326
+
327
+ Args:
328
+ investigation_id: Investigation UUID
329
+ error_message: Error description
330
+
331
+ Returns:
332
+ Updated investigation dict
333
+ """
334
+ return await self.update_investigation(
335
+ investigation_id,
336
+ status="failed",
337
+ current_phase="failed",
338
+ error_message=error_message,
339
+ completed_at=datetime.utcnow(),
340
+ )
341
+
342
+ async def list_user_investigations(
343
+ self,
344
+ user_id: str,
345
+ limit: int = 20,
346
+ offset: int = 0,
347
+ status: Optional[str] = None,
348
+ ) -> List[Dict[str, Any]]:
349
+ """
350
+ List investigations for a user.
351
+
352
+ Args:
353
+ user_id: User ID
354
+ limit: Maximum results
355
+ offset: Pagination offset
356
+ status: Filter by status
357
+
358
+ Returns:
359
+ List of investigation dicts
360
+ """
361
+ async with self.get_connection() as conn:
362
+ query = """
363
+ SELECT * FROM investigations
364
+ WHERE user_id = $1
365
+ """
366
+ params = [user_id]
367
+
368
+ if status:
369
+ query += " AND status = $2"
370
+ params.append(status)
371
+
372
+ query += " ORDER BY created_at DESC LIMIT $" + str(len(params) + 1)
373
+ params.append(limit)
374
+
375
+ query += " OFFSET $" + str(len(params) + 1)
376
+ params.append(offset)
377
+
378
+ rows = await conn.fetch(query, *params)
379
+
380
+ return [dict(row) for row in rows]
381
+
382
+ async def delete_investigation(
383
+ self,
384
+ investigation_id: str,
385
+ user_id: str,
386
+ ) -> bool:
387
+ """
388
+ Delete an investigation (soft delete by marking as cancelled).
389
+
390
+ Args:
391
+ investigation_id: Investigation UUID
392
+ user_id: User ID (for authorization)
393
+
394
+ Returns:
395
+ True if deleted, False if not found
396
+ """
397
+ async with self.get_connection() as conn:
398
+ result = await conn.execute(
399
+ """
400
+ UPDATE investigations
401
+ SET status = 'cancelled', completed_at = $1
402
+ WHERE id = $2 AND user_id = $3
403
+ """,
404
+ datetime.utcnow(),
405
+ investigation_id,
406
+ user_id,
407
+ )
408
+
409
+ # Extract number of rows affected
410
+ rows_affected = int(result.split()[-1])
411
+
412
+ if rows_affected > 0:
413
+ logger.info(f"Cancelled investigation {investigation_id}")
414
+ return True
415
+
416
+ return False
417
+
418
+ async def health_check(self) -> Dict[str, Any]:
419
+ """
420
+ Check Supabase connection health.
421
+
422
+ Returns:
423
+ Health status dict
424
+ """
425
+ try:
426
+ async with self.get_connection() as conn:
427
+ # Simple query to test connection
428
+ result = await conn.fetchval("SELECT 1")
429
+ pool_size = self._pool.get_size()
430
+ pool_free = self._pool.get_idle_size()
431
+
432
+ return {
433
+ "status": "healthy",
434
+ "connected": True,
435
+ "pool_size": pool_size,
436
+ "pool_free": pool_free,
437
+ "pool_used": pool_size - pool_free,
438
+ }
439
+ except Exception as e:
440
+ logger.error(f"Supabase health check failed: {e}")
441
+ return {
442
+ "status": "unhealthy",
443
+ "connected": False,
444
+ "error": str(e),
445
+ }
446
+
447
+
448
+ # Global service instance
449
+ supabase_service = SupabaseService()
450
+
451
+
452
+ async def get_supabase_service() -> SupabaseService:
453
+ """Get the global Supabase service instance."""
454
+ if not supabase_service._initialized:
455
+ await supabase_service.initialize()
456
+ return supabase_service