anderson-ufrj commited on
Commit
f29bf1c
·
1 Parent(s): 5688acd

feat(service): add high-level service for dados.gov.br integration

Browse files

- Create service layer with business logic for open data portal
- Implement methods for searching transparency datasets
- Add specialized searches for spending and procurement data
- Include data availability analysis functionality
- Integrate caching for performance optimization

Files changed (1) hide show
  1. src/services/dados_gov_service.py +378 -0
src/services/dados_gov_service.py ADDED
@@ -0,0 +1,378 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ High-level service for interacting with dados.gov.br API.
3
+
4
+ This service provides business logic and data transformation
5
+ for the Brazilian Open Data Portal integration.
6
+ """
7
+
8
+ import logging
9
+ from typing import Any, Dict, List, Optional, Tuple
10
+
11
+ from src.core.exceptions import ValidationError
12
+ from src.services.cache_service import CacheService, CacheTTL
13
+ from src.tools.dados_gov_api import DadosGovAPIClient, DadosGovAPIError
14
+ from src.tools.dados_gov_models import (
15
+ Dataset,
16
+ DatasetSearchResult,
17
+ Organization,
18
+ Resource,
19
+ ResourceSearchResult,
20
+ )
21
+
22
+ logger = logging.getLogger(__name__)
23
+
24
+
25
+ class DadosGovService:
26
+ """
27
+ Service for accessing and analyzing data from dados.gov.br.
28
+
29
+ This service provides high-level methods for searching datasets,
30
+ analyzing data availability, and retrieving government open data.
31
+ """
32
+
33
+ def __init__(self, api_key: Optional[str] = None):
34
+ """
35
+ Initialize the dados.gov.br service.
36
+
37
+ Args:
38
+ api_key: Optional API key for authentication
39
+ """
40
+ self.client = DadosGovAPIClient(api_key=api_key)
41
+ self.cache = CacheService()
42
+
43
+ async def close(self):
44
+ """Close service connections"""
45
+ await self.client.close()
46
+
47
+ async def search_transparency_datasets(
48
+ self,
49
+ keywords: Optional[List[str]] = None,
50
+ organization: Optional[str] = None,
51
+ data_format: Optional[str] = None,
52
+ limit: int = 20,
53
+ ) -> DatasetSearchResult:
54
+ """
55
+ Search for transparency-related datasets.
56
+
57
+ Args:
58
+ keywords: Keywords to search for (e.g., ["transparência", "gastos", "contratos"])
59
+ organization: Filter by specific organization
60
+ data_format: Preferred data format (csv, json, xml)
61
+ limit: Maximum number of results
62
+
63
+ Returns:
64
+ Search results with relevant datasets
65
+ """
66
+ # Build search query
67
+ query_parts = []
68
+ if keywords:
69
+ query_parts.extend(keywords)
70
+ else:
71
+ # Default transparency-related keywords
72
+ query_parts.extend([
73
+ "transparência",
74
+ "gastos públicos",
75
+ "contratos",
76
+ "licitações",
77
+ "servidores",
78
+ ])
79
+
80
+ query = " OR ".join(query_parts)
81
+
82
+ # Check cache
83
+ cache_key = f"dados_gov:search:{query}:{organization}:{data_format}:{limit}"
84
+ cached_result = await self.cache.get(cache_key)
85
+ if cached_result:
86
+ return DatasetSearchResult(**cached_result)
87
+
88
+ try:
89
+ # Search datasets
90
+ result = await self.client.search_datasets(
91
+ query=query,
92
+ organization=organization,
93
+ format=data_format,
94
+ limit=limit,
95
+ )
96
+
97
+ # Parse response
98
+ search_result = DatasetSearchResult(
99
+ count=result.get("count", 0),
100
+ results=[Dataset(**ds) for ds in result.get("results", [])],
101
+ facets=result.get("facets", {}),
102
+ search_facets=result.get("search_facets", {}),
103
+ )
104
+
105
+ # Cache result
106
+ await self.cache.set(
107
+ cache_key,
108
+ search_result.model_dump(),
109
+ ttl=CacheTTL.MEDIUM,
110
+ )
111
+
112
+ return search_result
113
+
114
+ except DadosGovAPIError as e:
115
+ logger.error(f"Error searching datasets: {e}")
116
+ raise
117
+
118
+ async def get_dataset_with_resources(self, dataset_id: str) -> Dataset:
119
+ """
120
+ Get complete dataset information including all resources.
121
+
122
+ Args:
123
+ dataset_id: Dataset identifier
124
+
125
+ Returns:
126
+ Complete dataset with resources
127
+ """
128
+ # Check cache
129
+ cache_key = f"dados_gov:dataset:{dataset_id}"
130
+ cached_dataset = await self.cache.get(cache_key)
131
+ if cached_dataset:
132
+ return Dataset(**cached_dataset)
133
+
134
+ try:
135
+ # Get dataset details
136
+ result = await self.client.get_dataset(dataset_id)
137
+ dataset = Dataset(**result.get("result", {}))
138
+
139
+ # Cache result
140
+ await self.cache.set(
141
+ cache_key,
142
+ dataset.model_dump(),
143
+ ttl=CacheTTL.LONG,
144
+ )
145
+
146
+ return dataset
147
+
148
+ except DadosGovAPIError as e:
149
+ logger.error(f"Error getting dataset {dataset_id}: {e}")
150
+ raise
151
+
152
+ async def find_government_spending_data(
153
+ self,
154
+ year: Optional[int] = None,
155
+ state: Optional[str] = None,
156
+ city: Optional[str] = None,
157
+ ) -> List[Dataset]:
158
+ """
159
+ Find datasets related to government spending.
160
+
161
+ Args:
162
+ year: Filter by specific year
163
+ state: Filter by state (e.g., "SP", "RJ")
164
+ city: Filter by city name
165
+
166
+ Returns:
167
+ List of relevant datasets
168
+ """
169
+ # Build search query
170
+ query_parts = ["gastos", "despesas", "pagamentos", "execução orçamentária"]
171
+
172
+ if year:
173
+ query_parts.append(str(year))
174
+ if state:
175
+ query_parts.append(state)
176
+ if city:
177
+ query_parts.append(city)
178
+
179
+ query = " ".join(query_parts)
180
+
181
+ # Search for datasets
182
+ result = await self.search_transparency_datasets(
183
+ keywords=[query],
184
+ data_format="csv", # Prefer CSV for analysis
185
+ limit=50,
186
+ )
187
+
188
+ # Filter results by relevance
189
+ relevant_datasets = []
190
+ for dataset in result.results:
191
+ # Check if dataset is relevant based on title and description
192
+ title_lower = dataset.title.lower()
193
+ notes_lower = (dataset.notes or "").lower()
194
+
195
+ if any(term in title_lower or term in notes_lower
196
+ for term in ["gasto", "despesa", "pagamento", "execução"]):
197
+ relevant_datasets.append(dataset)
198
+
199
+ return relevant_datasets
200
+
201
+ async def find_procurement_data(
202
+ self,
203
+ organization: Optional[str] = None,
204
+ modality: Optional[str] = None,
205
+ ) -> List[Dataset]:
206
+ """
207
+ Find datasets related to public procurement and contracts.
208
+
209
+ Args:
210
+ organization: Filter by organization
211
+ modality: Procurement modality (e.g., "pregão", "concorrência")
212
+
213
+ Returns:
214
+ List of procurement-related datasets
215
+ """
216
+ keywords = ["licitação", "contratos", "pregão", "compras públicas"]
217
+ if modality:
218
+ keywords.append(modality)
219
+
220
+ result = await self.search_transparency_datasets(
221
+ keywords=keywords,
222
+ organization=organization,
223
+ limit=30,
224
+ )
225
+
226
+ return result.results
227
+
228
+ async def analyze_data_availability(
229
+ self,
230
+ topic: str,
231
+ ) -> Dict[str, Any]:
232
+ """
233
+ Analyze what data is available for a specific topic.
234
+
235
+ Args:
236
+ topic: Topic to analyze (e.g., "educação", "saúde", "segurança")
237
+
238
+ Returns:
239
+ Analysis of available data including formats, organizations, and coverage
240
+ """
241
+ # Search for topic-related datasets
242
+ result = await self.search_transparency_datasets(
243
+ keywords=[topic],
244
+ limit=100,
245
+ )
246
+
247
+ # Analyze results
248
+ analysis = {
249
+ "topic": topic,
250
+ "total_datasets": result.count,
251
+ "analyzed_datasets": len(result.results),
252
+ "organizations": {},
253
+ "formats": {},
254
+ "years_covered": set(),
255
+ "geographic_coverage": {
256
+ "federal": 0,
257
+ "state": 0,
258
+ "municipal": 0,
259
+ },
260
+ "update_frequency": {
261
+ "daily": 0,
262
+ "monthly": 0,
263
+ "yearly": 0,
264
+ "unknown": 0,
265
+ },
266
+ }
267
+
268
+ # Process each dataset
269
+ for dataset in result.results:
270
+ # Count by organization
271
+ if dataset.organization:
272
+ org_name = dataset.organization.title
273
+ analysis["organizations"][org_name] = (
274
+ analysis["organizations"].get(org_name, 0) + 1
275
+ )
276
+
277
+ # Count by format
278
+ for resource in dataset.resources:
279
+ if resource.format:
280
+ fmt = resource.format.upper()
281
+ analysis["formats"][fmt] = analysis["formats"].get(fmt, 0) + 1
282
+
283
+ # Extract years from title/description
284
+ import re
285
+ text = f"{dataset.title} {dataset.notes or ''}"
286
+ years = re.findall(r'\b(19|20)\d{2}\b', text)
287
+ analysis["years_covered"].update(years)
288
+
289
+ # Detect geographic coverage
290
+ text_lower = text.lower()
291
+ if any(term in text_lower for term in ["federal", "brasil", "nacional"]):
292
+ analysis["geographic_coverage"]["federal"] += 1
293
+ elif any(term in text_lower for term in ["estado", "estadual", "uf"]):
294
+ analysis["geographic_coverage"]["state"] += 1
295
+ elif any(term in text_lower for term in ["município", "municipal", "cidade"]):
296
+ analysis["geographic_coverage"]["municipal"] += 1
297
+
298
+ # Detect update frequency
299
+ if any(term in text_lower for term in ["diário", "diariamente"]):
300
+ analysis["update_frequency"]["daily"] += 1
301
+ elif any(term in text_lower for term in ["mensal", "mensalmente"]):
302
+ analysis["update_frequency"]["monthly"] += 1
303
+ elif any(term in text_lower for term in ["anual", "anualmente"]):
304
+ analysis["update_frequency"]["yearly"] += 1
305
+ else:
306
+ analysis["update_frequency"]["unknown"] += 1
307
+
308
+ # Convert years set to sorted list
309
+ analysis["years_covered"] = sorted(list(analysis["years_covered"]))
310
+
311
+ # Sort organizations by dataset count
312
+ analysis["organizations"] = dict(
313
+ sorted(
314
+ analysis["organizations"].items(),
315
+ key=lambda x: x[1],
316
+ reverse=True,
317
+ )[:10] # Top 10 organizations
318
+ )
319
+
320
+ return analysis
321
+
322
+ async def get_resource_download_url(self, resource_id: str) -> str:
323
+ """
324
+ Get the download URL for a specific resource.
325
+
326
+ Args:
327
+ resource_id: Resource identifier
328
+
329
+ Returns:
330
+ Direct download URL
331
+ """
332
+ try:
333
+ result = await self.client.get_resource(resource_id)
334
+ resource = Resource(**result.get("result", {}))
335
+ return resource.url
336
+ except DadosGovAPIError as e:
337
+ logger.error(f"Error getting resource {resource_id}: {e}")
338
+ raise
339
+
340
+ async def list_government_organizations(self) -> List[Organization]:
341
+ """
342
+ List all government organizations that publish open data.
343
+
344
+ Returns:
345
+ List of organizations sorted by dataset count
346
+ """
347
+ # Check cache
348
+ cache_key = "dados_gov:organizations"
349
+ cached_orgs = await self.cache.get(cache_key)
350
+ if cached_orgs:
351
+ return [Organization(**org) for org in cached_orgs]
352
+
353
+ try:
354
+ # Get organizations
355
+ result = await self.client.list_organizations()
356
+ organizations = [
357
+ Organization(**org)
358
+ for org in result.get("result", [])
359
+ ]
360
+
361
+ # Sort by package count
362
+ organizations.sort(
363
+ key=lambda x: x.package_count or 0,
364
+ reverse=True,
365
+ )
366
+
367
+ # Cache result
368
+ await self.cache.set(
369
+ cache_key,
370
+ [org.model_dump() for org in organizations],
371
+ ttl=CacheTTL.LONG,
372
+ )
373
+
374
+ return organizations
375
+
376
+ except DadosGovAPIError as e:
377
+ logger.error(f"Error listing organizations: {e}")
378
+ raise