anderson-ufrj
fix(supabase): auto-detect environment and use REST API on HuggingFace
3177117
"""
Module: api.routes.export
Description: Export endpoints for downloading investigations, reports and data
Author: Anderson H. Silva
Date: 2025-01-25
License: Proprietary - All rights reserved
"""
import io
from datetime import datetime
from typing import Dict, List, Optional, Any, Union
from uuid import uuid4
from fastapi import APIRouter, HTTPException, Depends, Query, Response
from fastapi.responses import StreamingResponse
import pandas as pd
from pydantic import BaseModel, Field as PydanticField, validator
from src.core import json_utils
from src.core import get_logger
from src.api.middleware.authentication import get_current_user
from src.services.export_service import export_service
from src.services.investigation_service_selector import investigation_service
from src.services.data_service import data_service
logger = get_logger(__name__)
router = APIRouter()
class ExportRequest(BaseModel):
"""Request model for data export."""
export_type: str = PydanticField(description="Type of data to export")
format: str = PydanticField(description="Export format")
filters: Optional[Dict[str, Any]] = PydanticField(default={}, description="Filters to apply")
include_metadata: bool = PydanticField(default=True, description="Include metadata")
compress: bool = PydanticField(default=False, description="Compress output")
@validator('export_type')
def validate_export_type(cls, v):
"""Validate export type."""
allowed_types = [
'investigations', 'contracts', 'anomalies',
'reports', 'analytics', 'full_data',
'visualization', 'regional_analysis', 'time_series'
]
if v not in allowed_types:
raise ValueError(f'Export type must be one of: {allowed_types}')
return v
@validator('format')
def validate_format(cls, v):
"""Validate export format."""
allowed_formats = ['excel', 'csv', 'json', 'pdf']
if v not in allowed_formats:
raise ValueError(f'Format must be one of: {allowed_formats}')
return v
class BulkExportRequest(BaseModel):
"""Request model for bulk export."""
exports: List[Dict[str, Any]] = PydanticField(description="List of exports to generate")
compress: bool = PydanticField(default=True, description="Compress all exports")
@validator('exports')
def validate_exports(cls, v):
"""Validate exports list."""
if not v:
raise ValueError('At least one export must be specified')
if len(v) > 50:
raise ValueError('Maximum 50 exports allowed per request')
return v
@router.post("/investigations/{investigation_id}/download")
async def export_investigation(
investigation_id: str,
format: str = Query("excel", description="Export format: excel, csv, pdf, json"),
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Export investigation data in various formats.
Exports complete investigation data including anomalies,
contracts, and analysis results.
"""
# Get investigation data
investigation = await investigation_service.get_investigation(
investigation_id,
user_id=current_user.get("user_id")
)
if not investigation:
raise HTTPException(status_code=404, detail="Investigation not found")
filename = f"investigation_{investigation_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if format == "excel":
# Convert to Excel
file_bytes = await export_service.convert_investigation_to_excel(investigation)
return Response(
content=file_bytes,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
"Content-Disposition": f"attachment; filename={filename}.xlsx"
}
)
elif format == "csv":
# Create CSV with main data
main_data = {
'investigation_id': investigation['id'],
'type': investigation['type'],
'status': investigation['status'],
'created_at': investigation['created_at'],
'anomalies_count': len(investigation.get('anomalies', [])),
'total_value': investigation.get('total_value', 0),
}
df = pd.DataFrame([main_data])
csv_bytes = await export_service.generate_csv(df)
return Response(
content=csv_bytes,
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename={filename}.csv"
}
)
elif format == "pdf":
# Generate PDF report
content = _format_investigation_as_markdown(investigation)
pdf_bytes = await export_service.generate_pdf(
content=content,
title=f"Investigação {investigation_id}",
metadata={
'investigation_id': investigation_id,
'generated_at': datetime.now().isoformat(),
'user': current_user.get('email', 'Unknown')
}
)
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={
"Content-Disposition": f"attachment; filename={filename}.pdf"
}
)
elif format == "json":
return Response(
content=json_utils.dumps(investigation, indent=2, ensure_ascii=False),
media_type="application/json",
headers={
"Content-Disposition": f"attachment; filename={filename}.json"
}
)
else:
raise HTTPException(status_code=400, detail="Unsupported format")
@router.post("/contracts/export")
async def export_contracts(
request: ExportRequest,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Export contract data with filters.
Allows exporting filtered contract data in various formats.
"""
# Apply filters
filters = request.filters or {}
# Get contracts data
contracts = await data_service.search_contracts(
**filters,
limit=10000 # Reasonable limit for exports
)
if not contracts:
raise HTTPException(status_code=404, detail="No contracts found with given filters")
filename = f"contracts_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if request.format == "excel":
# Convert to DataFrame
df = pd.DataFrame(contracts)
# Generate Excel with formatting
excel_bytes = await export_service.generate_excel(
data=df,
title="Contratos - Portal da Transparência",
metadata={
'exported_at': datetime.now().isoformat(),
'total_records': len(contracts),
'filters': filters
}
)
return Response(
content=excel_bytes,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
"Content-Disposition": f"attachment; filename={filename}.xlsx"
}
)
elif request.format == "csv":
df = pd.DataFrame(contracts)
csv_bytes = await export_service.generate_csv(df)
return Response(
content=csv_bytes,
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename={filename}.csv"
}
)
else:
raise HTTPException(status_code=400, detail="Format not supported for contracts export")
@router.post("/anomalies/export")
async def export_anomalies(
request: ExportRequest,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Export anomaly data with filters.
Exports detected anomalies in various formats.
"""
# Get anomalies from investigations
filters = request.filters or {}
# For now, get anomalies from recent investigations
# In production, this would query a dedicated anomalies table
investigations = await investigation_service.list_investigations(
user_id=current_user.get("user_id"),
status="completed",
limit=100
)
all_anomalies = []
for inv in investigations:
anomalies = inv.get('anomalies', [])
for anomaly in anomalies:
anomaly['investigation_id'] = inv['id']
all_anomalies.append(anomaly)
if not all_anomalies:
raise HTTPException(status_code=404, detail="No anomalies found")
filename = f"anomalies_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if request.format == "excel":
df = pd.DataFrame(all_anomalies)
# Separate by severity
high_severity = df[df['severity'] >= 0.7]
medium_severity = df[(df['severity'] >= 0.4) & (df['severity'] < 0.7)]
low_severity = df[df['severity'] < 0.4]
dataframes = {
'Alta Severidade': high_severity,
'Média Severidade': medium_severity,
'Baixa Severidade': low_severity,
'Todas Anomalias': df
}
excel_bytes = await export_service.generate_excel(
data=dataframes,
title="Anomalias Detectadas - Cidadão.AI",
metadata={
'exported_at': datetime.now().isoformat(),
'total_anomalies': len(all_anomalies),
'high_severity_count': len(high_severity),
'medium_severity_count': len(medium_severity),
'low_severity_count': len(low_severity),
}
)
return Response(
content=excel_bytes,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
"Content-Disposition": f"attachment; filename={filename}.xlsx"
}
)
else:
raise HTTPException(status_code=400, detail="Format not supported for anomalies export")
@router.post("/bulk")
async def bulk_export(
request: BulkExportRequest,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Create bulk export with multiple files.
Generates a ZIP file containing multiple exports.
"""
exports_config = []
for export in request.exports:
export_type = export.get('type')
export_format = export.get('format', 'json')
if export_type == 'investigation':
investigation = await investigation_service.get_investigation(
export['id'],
user_id=current_user.get("user_id")
)
if investigation:
if export_format == 'pdf':
content = _format_investigation_as_markdown(investigation)
exports_config.append({
'filename': f"investigation_{export['id']}.pdf",
'content': content,
'format': 'pdf',
'title': f"Investigação {export['id']}",
'metadata': {'investigation_id': export['id']}
})
else:
exports_config.append({
'filename': f"investigation_{export['id']}.json",
'content': json_utils.dumps(investigation, indent=2),
'format': 'json'
})
if not exports_config:
raise HTTPException(status_code=404, detail="No data found for bulk export")
# Generate ZIP
zip_bytes = await export_service.generate_bulk_export(exports_config)
return Response(
content=zip_bytes,
media_type="application/zip",
headers={
"Content-Disposition": f"attachment; filename=bulk_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
}
)
def _format_investigation_as_markdown(investigation: Dict[str, Any]) -> str:
"""Format investigation data as markdown for PDF generation."""
lines = []
lines.append(f"# Investigação {investigation['id']}")
lines.append("")
lines.append(f"**Tipo**: {investigation.get('type', 'N/A')}")
lines.append(f"**Status**: {investigation.get('status', 'N/A')}")
lines.append(f"**Data de Criação**: {investigation.get('created_at', 'N/A')}")
lines.append("")
if investigation.get('summary'):
lines.append("## Resumo")
lines.append(investigation['summary'])
lines.append("")
anomalies = investigation.get('anomalies', [])
if anomalies:
lines.append("## Anomalias Detectadas")
lines.append("")
lines.append(f"Total de anomalias: {len(anomalies)}")
lines.append("")
for i, anomaly in enumerate(anomalies, 1):
lines.append(f"### Anomalia {i}")
lines.append(f"**Tipo**: {anomaly.get('type', 'N/A')}")
lines.append(f"**Severidade**: {anomaly.get('severity', 0):.2f}")
lines.append(f"**Descrição**: {anomaly.get('description', 'N/A')}")
lines.append(f"**Explicação**: {anomaly.get('explanation', 'N/A')}")
lines.append("")
return "\n".join(lines)
@router.post("/visualization/export")
async def export_visualization_data(
request: ExportRequest,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Export visualization data in optimized formats.
Uses the Oscar Niemeyer agent to format data for charts and dashboards.
Supports Excel with multiple sheets, CSV, and JSON formats.
"""
from src.services.agent_lazy_loader import AgentLazyLoader
from src.agents.oscar_niemeyer import OscarNiemeyerAgent
from src.agents.deodoro import AgentContext
agent_loader = AgentLazyLoader()
# Get Oscar Niemeyer agent
oscar_agent = await agent_loader.get_agent("oscar_niemeyer")
if not oscar_agent:
oscar_agent = OscarNiemeyerAgent()
await oscar_agent.initialize()
filename = f"visualization_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Get data based on filters
filters = request.filters or {}
dataset_type = filters.get("dataset_type", "contracts")
time_range = filters.get("time_range", "30d")
dimensions = filters.get("dimensions", ["category"])
metrics = filters.get("metrics", ["total_value", "count"])
# Create context for Oscar agent
context = AgentContext(
investigation_id=f"export_{uuid4()}",
user_id=current_user.get("user_id"),
session_id="export_session",
metadata={"export_format": request.format}
)
# Get visualization data from Oscar
from src.agents.deodoro import AgentMessage
message = AgentMessage(
role="user",
content=f"Generate visualization data for export",
type="visualization_metadata",
data={
"data_type": dataset_type,
"dimensions": dimensions,
"metrics": metrics,
"time_range": time_range,
"export": True
}
)
response = await oscar_agent.process(message, context)
if not response.success:
raise HTTPException(status_code=500, detail="Failed to generate visualization data")
viz_data = response.data
if request.format == "excel":
# Create multiple sheets for different visualizations
dataframes = {}
# Summary sheet
summary_data = {
"Metric": metrics,
"Total": [1000000, 150], # Placeholder values
"Average": [50000, 7.5],
"Min": [1000, 1],
"Max": [500000, 50]
}
dataframes["Summary"] = pd.DataFrame(summary_data)
# Time series data if applicable
if hasattr(viz_data, "series"):
series_data = []
for series in viz_data.series:
series_data.append({
"Series": series["name"],
"Field": series["field"],
"Type": series.get("type", "line")
})
dataframes["Series Configuration"] = pd.DataFrame(series_data)
# Dimensional breakdown
if dimensions:
dim_data = {
"Dimension": dimensions,
"Unique Values": [10, 5, 20], # Placeholder
"Coverage": ["100%", "95%", "100%"]
}
dataframes["Dimensions"] = pd.DataFrame(dim_data)
excel_bytes = await export_service.generate_excel(
data=dataframes,
title=f"Visualization Data - {dataset_type}",
metadata={
'exported_at': datetime.now().isoformat(),
'dataset_type': dataset_type,
'time_range': time_range,
'dimensions': dimensions,
'metrics': metrics,
'visualization_type': viz_data.visualization_type.value if hasattr(viz_data, 'visualization_type') else 'unknown'
}
)
return Response(
content=excel_bytes,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
"Content-Disposition": f"attachment; filename={filename}.xlsx"
}
)
elif request.format == "csv":
# For CSV, export a simplified flat structure
export_data = await oscar_agent.create_export_format(
data=[], # Would contain actual data
format_type="csv",
options={"delimiter": ","}
)
return Response(
content=export_data,
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename={filename}.csv"
}
)
elif request.format == "json":
# For JSON, provide the full visualization specification
export_data = await oscar_agent.create_export_format(
data={
"visualization": {
"type": viz_data.visualization_type.value if hasattr(viz_data, 'visualization_type') else 'unknown',
"title": viz_data.title if hasattr(viz_data, 'title') else "Data Export",
"config": {
"x_axis": viz_data.x_axis if hasattr(viz_data, 'x_axis') else {},
"y_axis": viz_data.y_axis if hasattr(viz_data, 'y_axis') else {},
"series": viz_data.series if hasattr(viz_data, 'series') else []
}
},
"metadata": {
"exported_at": datetime.now().isoformat(),
"filters": filters
}
},
format_type="json",
options={"pretty": True}
)
return Response(
content=export_data,
media_type="application/json",
headers={
"Content-Disposition": f"attachment; filename={filename}.json"
}
)
else:
raise HTTPException(status_code=400, detail=f"Format {request.format} not supported for visualization export")
@router.post("/regional-analysis/export")
async def export_regional_analysis(
request: ExportRequest,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Export regional analysis data with geographic insights.
Uses the Lampião agent to export regional disparities and clustering analysis.
Includes inequality indices, regional rankings, and policy recommendations.
"""
from src.services.agent_lazy_loader import AgentLazyLoader
from src.agents.lampiao import LampiaoAgent, RegionType
from src.agents.deodoro import AgentContext, AgentMessage
agent_loader = AgentLazyLoader()
# Get Lampião agent
lampiao_agent = await agent_loader.get_agent("lampiao")
if not lampiao_agent:
lampiao_agent = LampiaoAgent()
await lampiao_agent.initialize()
filename = f"regional_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Get parameters from filters
filters = request.filters or {}
metric = filters.get("metric", "government_spending")
region_type = filters.get("region_type", "state")
# Create context
context = AgentContext(
investigation_id=f"export_regional_{uuid4()}",
user_id=current_user.get("user_id"),
session_id="export_session",
metadata={"export_format": request.format}
)
# Get regional analysis
message = AgentMessage(
role="user",
content=f"Analyze regional distribution of {metric}",
data={
"metric": metric,
"region_type": region_type,
"export": True
}
)
response = await lampiao_agent.process(message, context)
if not response.success:
raise HTTPException(status_code=500, detail="Failed to generate regional analysis")
regional_data = response.data
if request.format == "excel":
dataframes = {}
# Regional metrics sheet
metrics_data = []
for metric in regional_data.metrics:
metrics_data.append({
"Region Code": metric.region_id,
"Region Name": metric.region_name,
"Value": metric.value,
"Normalized Value": metric.normalized_value,
"Rank": metric.rank,
"Percentile": metric.percentile,
"Population": metric.metadata.get("population", "N/A")
})
dataframes["Regional Data"] = pd.DataFrame(metrics_data)
# Inequality indices
inequality_data = {
"Index": list(regional_data.inequalities.keys()),
"Value": list(regional_data.inequalities.values()),
"Interpretation": [
"High inequality" if v > 0.4 else "Moderate inequality" if v > 0.2 else "Low inequality"
for v in regional_data.inequalities.values()
]
}
dataframes["Inequality Analysis"] = pd.DataFrame(inequality_data)
# Clusters
if regional_data.clusters:
cluster_data = []
for cluster in regional_data.clusters:
for region in cluster["regions"]:
cluster_data.append({
"Cluster": cluster["cluster_id"],
"Region": region,
"Avg Value": cluster["characteristics"].get("avg_value", "N/A")
})
dataframes["Regional Clusters"] = pd.DataFrame(cluster_data)
# Recommendations
rec_data = {
"Recommendation": regional_data.recommendations,
"Priority": ["High"] * min(2, len(regional_data.recommendations)) +
["Medium"] * (len(regional_data.recommendations) - 2)
}
dataframes["Policy Recommendations"] = pd.DataFrame(rec_data)
excel_bytes = await export_service.generate_excel(
data=dataframes,
title=f"Regional Analysis - {metric}",
metadata={
'exported_at': datetime.now().isoformat(),
'metric': metric,
'region_type': region_type,
'regions_analyzed': regional_data.regions_analyzed,
'analysis_type': regional_data.analysis_type.value
}
)
return Response(
content=excel_bytes,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
"Content-Disposition": f"attachment; filename={filename}.xlsx"
}
)
elif request.format == "csv":
# For CSV, create a flat structure with all regional data
csv_data = []
for metric_data in regional_data.metrics:
csv_data.append({
"region_id": metric_data.region_id,
"region_name": metric_data.region_name,
"metric": metric,
"value": metric_data.value,
"normalized_value": metric_data.normalized_value,
"rank": metric_data.rank,
"percentile": metric_data.percentile,
"population": metric_data.metadata.get("population", 0),
"area_km2": metric_data.metadata.get("area", 0),
"gini_index": regional_data.inequalities.get("gini", 0),
"theil_index": regional_data.inequalities.get("theil_l", 0),
"analysis_type": regional_data.analysis_type.value,
"timestamp": datetime.now().isoformat()
})
df = pd.DataFrame(csv_data)
csv_bytes = await export_service.generate_csv(df)
return Response(
content=csv_bytes,
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename={filename}.csv"
}
)
elif request.format == "json":
# For JSON, provide complete structured data optimized for visualization
export_data = {
"metadata": {
"exported_at": datetime.now().isoformat(),
"metric": metric,
"region_type": region_type,
"analysis_type": regional_data.analysis_type.value,
"regions_analyzed": regional_data.regions_analyzed
},
"data": {
"regions": [
{
"id": m.region_id,
"name": m.region_name,
"value": m.value,
"normalized_value": m.normalized_value,
"rank": m.rank,
"percentile": m.percentile,
"metadata": m.metadata
}
for m in regional_data.metrics
],
"statistics": regional_data.statistics,
"inequalities": regional_data.inequalities,
"clusters": regional_data.clusters,
"recommendations": [
{
"text": rec,
"priority": "high" if i < 2 else "medium"
}
for i, rec in enumerate(regional_data.recommendations)
]
},
"visualization_hints": {
"chart_type": "choropleth_map",
"color_scale": "viridis",
"value_field": "value",
"region_id_field": "id",
"tooltip_fields": ["name", "value", "rank", "percentile"]
}
}
return Response(
content=json_utils.dumps(export_data, indent=2, ensure_ascii=False),
media_type="application/json",
headers={
"Content-Disposition": f"attachment; filename={filename}.json"
}
)
else:
raise HTTPException(status_code=400, detail=f"Format {request.format} not supported for regional analysis export")
@router.post("/time-series/export")
async def export_time_series_data(
request: ExportRequest,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Export time series data optimized for visualization.
Formats historical data, trends, and optionally forecasts
in formats suitable for charting libraries.
"""
from src.services.agent_lazy_loader import AgentLazyLoader
from src.agents.oscar_niemeyer import OscarNiemeyerAgent, TimeGranularity
from src.agents.deodoro import AgentContext
agent_loader = AgentLazyLoader()
# Get Oscar Niemeyer agent
oscar_agent = await agent_loader.get_agent("oscar_niemeyer")
if not oscar_agent:
oscar_agent = OscarNiemeyerAgent()
await oscar_agent.initialize()
filename = f"time_series_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Get parameters from filters
filters = request.filters or {}
metric = filters.get("metric", "total_value")
start_date = filters.get("start_date")
end_date = filters.get("end_date")
granularity = filters.get("granularity", "day")
include_forecast = filters.get("include_forecast", False)
# Create context
context = AgentContext(
investigation_id=f"export_ts_{uuid4()}",
user_id=current_user.get("user_id"),
session_id="export_session",
metadata={"export_format": request.format}
)
# Get time series data
granularity_enum = TimeGranularity[granularity.upper()] if granularity.upper() in TimeGranularity.__members__ else TimeGranularity.DAY
time_series_data = await oscar_agent.generate_time_series(
metric,
start_date,
end_date,
granularity_enum,
context
)
if request.format == "excel":
# Create Excel with time series data
dataframes = {}
# Main time series data
ts_data = []
for i, (time_point, value) in enumerate(zip(time_series_data.time_points, time_series_data.values)):
ts_data.append({
"Date": time_point,
"Value": value,
"Metric": metric,
"Index": i
})
dataframes["Time Series"] = pd.DataFrame(ts_data)
# Summary statistics
summary_data = {
"Metric": metric,
"Start Date": time_series_data.time_points[0] if time_series_data.time_points else None,
"End Date": time_series_data.time_points[-1] if time_series_data.time_points else None,
"Data Points": len(time_series_data.values),
"Min Value": min(time_series_data.values) if time_series_data.values else 0,
"Max Value": max(time_series_data.values) if time_series_data.values else 0,
"Mean Value": sum(time_series_data.values) / len(time_series_data.values) if time_series_data.values else 0,
"Granularity": granularity
}
dataframes["Summary"] = pd.DataFrame([summary_data])
excel_bytes = await export_service.generate_excel(
data=dataframes,
title=f"Time Series - {metric}",
metadata={
'exported_at': datetime.now().isoformat(),
'metric': metric,
'granularity': granularity,
'data_points': len(ts_data)
}
)
return Response(
content=excel_bytes,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
"Content-Disposition": f"attachment; filename={filename}.xlsx"
}
)
elif request.format == "csv":
# Create CSV with time series data
ts_data = []
for time_point, value in zip(time_series_data.time_points, time_series_data.values):
ts_data.append({
"timestamp": time_point.isoformat(),
"value": value,
"metric": metric,
"granularity": granularity
})
df = pd.DataFrame(ts_data)
csv_bytes = await export_service.generate_csv(df)
return Response(
content=csv_bytes,
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename={filename}.csv"
}
)
elif request.format == "json":
# JSON format optimized for charting libraries (Chart.js, D3.js, etc.)
export_data = {
"metadata": {
"exported_at": datetime.now().isoformat(),
"metric": metric,
"granularity": granularity,
"aggregation_type": time_series_data.aggregation_type.value,
"data_points": len(time_series_data.values)
},
"data": {
"labels": [tp.isoformat() for tp in time_series_data.time_points],
"datasets": [{
"label": metric.replace("_", " ").title(),
"data": time_series_data.values,
"borderColor": "rgb(75, 192, 192)",
"backgroundColor": "rgba(75, 192, 192, 0.2)"
}]
},
"options": {
"responsive": True,
"plugins": {
"title": {
"display": True,
"text": f"{metric.replace('_', ' ').title()} - Time Series"
},
"tooltip": {
"mode": "index",
"intersect": False
}
},
"scales": {
"x": {
"display": True,
"title": {
"display": True,
"text": "Date"
}
},
"y": {
"display": True,
"title": {
"display": True,
"text": metric.replace("_", " ").title()
}
}
}
}
}
# Add forecast data if requested
if include_forecast:
# Simple forecast placeholder
forecast_points = 7
last_value = time_series_data.values[-1] if time_series_data.values else 0
last_time = time_series_data.time_points[-1] if time_series_data.time_points else datetime.utcnow()
forecast_labels = []
forecast_values = []
for i in range(forecast_points):
if granularity == "day":
next_time = last_time + timedelta(days=i+1)
else:
next_time = last_time + timedelta(days=(i+1)*30)
forecast_labels.append(next_time.isoformat())
forecast_values.append(last_value * (1 + 0.02 * (i+1)))
export_data["data"]["datasets"].append({
"label": "Forecast",
"data": forecast_values,
"borderColor": "rgb(255, 99, 132)",
"backgroundColor": "rgba(255, 99, 132, 0.2)",
"borderDash": [5, 5]
})
# Extend labels for forecast
export_data["data"]["labels"].extend(forecast_labels)
return Response(
content=json_utils.dumps(export_data, indent=2, ensure_ascii=False),
media_type="application/json",
headers={
"Content-Disposition": f"attachment; filename={filename}.json"
}
)
else:
raise HTTPException(status_code=400, detail=f"Format {request.format} not supported for time series export")