🧠 Cidadão.AI Machine Learning Pipeline
📋 Overview
The Machine Learning Pipeline powers the analytical core of Cidadão.AI with advanced anomaly detection, pattern recognition, and explainable AI capabilities. Built with scikit-learn, TensorFlow, and statistical analysis tools to provide transparent, interpretable insights into government data.
🏗️ Architecture
src/ml/
├── models.py # Core ML models and algorithms
├── anomaly_detector.py # Anomaly detection engine
├── pattern_analyzer.py # Pattern recognition system
├── spectral_analyzer.py # Frequency domain analysis
├── data_pipeline.py # Data preprocessing pipeline
├── training_pipeline.py # Model training orchestration
├── advanced_pipeline.py # Advanced ML algorithms
├── cidadao_model.py # Custom Cidadão.AI model
├── hf_cidadao_model.py # HuggingFace integration
├── model_api.py # Model serving API
├── hf_integration.py # HuggingFace deployment
└── transparency_benchmark.py # Model evaluation benchmarks
🔬 Core ML Capabilities
1. Anomaly Detection Engine (anomaly_detector.py)
Statistical Anomaly Detection
class AnomalyDetector:
"""
Multi-algorithm anomaly detection for government transparency data
Methods:
- Statistical outliers (Z-score, IQR, Modified Z-score)
- Isolation Forest for high-dimensional data
- One-Class SVM for complex patterns
- Local Outlier Factor for density-based detection
- Time series anomalies with seasonal decomposition
"""
# Price anomaly detection
def detect_price_anomalies(
self,
contracts: List[Contract],
threshold: float = 2.5
) -> List[PriceAnomaly]:
"""
Detect price anomalies using statistical methods
Algorithm:
1. Group contracts by category/type
2. Calculate mean and standard deviation
3. Flag contracts beyond threshold * std_dev
4. Apply contextual filters (contract size, organization type)
"""
# Vendor concentration analysis
def detect_vendor_concentration(
self,
contracts: List[Contract],
concentration_threshold: float = 0.7
) -> List[VendorConcentrationAnomaly]:
"""
Detect monopolistic vendor patterns
Algorithm:
1. Calculate vendor market share by organization
2. Apply Herfindahl-Hirschman Index (HHI)
3. Flag organizations with high vendor concentration
4. Analyze temporal patterns for sudden changes
"""
Advanced Anomaly Types
# Anomaly classification system
class AnomalyType(Enum):
PRICE_OUTLIER = "price_outlier" # Statistical price deviation
VENDOR_CONCENTRATION = "vendor_concentration" # Market concentration
TEMPORAL_SUSPICION = "temporal_suspicion" # Timing irregularities
DUPLICATE_CONTRACT = "duplicate_contract" # Contract similarity
PAYMENT_IRREGULARITY = "payment_irregularity" # Payment pattern anomaly
SEASONAL_DEVIATION = "seasonal_deviation" # Seasonal pattern break
NETWORK_ANOMALY = "network_anomaly" # Graph-based anomalies
# Severity classification
class AnomalySeverity(Enum):
LOW = "low" # Minor deviations, may be normal
MEDIUM = "medium" # Noticeable patterns requiring attention
HIGH = "high" # Strong indicators of irregularities
CRITICAL = "critical" # Severe anomalies requiring immediate action
2. Pattern Analysis System (pattern_analyzer.py)
Time Series Analysis
class PatternAnalyzer:
"""
Advanced pattern recognition for government spending patterns
Capabilities:
- Seasonal decomposition (trend, seasonal, residual)
- Spectral analysis using FFT
- Cross-correlation analysis between organizations
- Regime change detection
- Forecasting with uncertainty quantification
"""
def analyze_spending_trends(
self,
expenses: List[Expense],
decomposition_model: str = "additive"
) -> TrendAnalysis:
"""
Decompose spending into trend, seasonal, and irregular components
Algorithm:
1. Time series preprocessing and gap filling
2. Seasonal-Trend decomposition using LOESS (STL)
3. Trend change point detection
4. Seasonal pattern stability analysis
5. Residual anomaly identification
"""
def detect_spending_regime_changes(
self,
time_series: np.ndarray,
method: str = "cusum"
) -> List[RegimeChange]:
"""
Detect structural breaks in spending patterns
Methods:
- CUSUM (Cumulative Sum) control charts
- Bayesian change point detection
- Structural break tests (Chow test, Quandt-Andrews)
"""
Cross-Organizational Analysis
def analyze_cross_organizational_patterns(
self,
organizations: List[str],
time_window: str = "monthly"
) -> CrossOrgAnalysis:
"""
Identify patterns across government organizations
Features:
- Spending correlation analysis
- Synchronized timing detection
- Resource competition analysis
- Coordination pattern identification
"""
# Calculate cross-correlation matrix
correlation_matrix = np.corrcoef([
org_spending_series for org in organizations
])
# Detect synchronized events
synchronized_events = self._detect_synchronized_spending(
organizations, threshold=0.8
)
return CrossOrgAnalysis(
correlation_matrix=correlation_matrix,
synchronized_events=synchronized_events,
coordination_score=self._calculate_coordination_score(correlation_matrix)
)
3. Spectral Analysis Engine (spectral_analyzer.py)
Frequency Domain Analysis
class SpectralAnalyzer:
"""
Frequency domain analysis for detecting periodic patterns
Applications:
- End-of-year spending rush detection
- Electoral cycle influence analysis
- Budget cycle pattern identification
- Periodic corruption pattern detection
"""
def analyze_spending_spectrum(
self,
spending_series: np.ndarray,
sampling_rate: str = "monthly"
) -> SpectralAnalysis:
"""
Perform FFT analysis on spending time series
Algorithm:
1. Preprocessing: detrending, windowing
2. Fast Fourier Transform (FFT)
3. Power spectral density estimation
4. Peak detection in frequency domain
5. Periodic pattern significance testing
"""
# Remove trend and apply windowing
detrended = signal.detrend(spending_series)
windowed = detrended * signal.windows.hann(len(detrended))
# FFT analysis
frequencies = np.fft.fftfreq(len(windowed))
fft_result = np.fft.fft(windowed)
power_spectrum = np.abs(fft_result) ** 2
# Detect significant peaks
peaks, properties = signal.find_peaks(
power_spectrum,
height=np.mean(power_spectrum) + 2 * np.std(power_spectrum),
distance=10
)
return SpectralAnalysis(
frequencies=frequencies[peaks],
power_spectrum=power_spectrum,
significant_periods=1 / frequencies[peaks],
seasonality_strength=self._calculate_seasonality_strength(power_spectrum)
)
4. Data Processing Pipeline (data_pipeline.py)
Advanced Data Preprocessing
class DataPipeline:
"""
Comprehensive data preprocessing for ML algorithms
Features:
- Missing value imputation with multiple strategies
- Outlier detection and treatment
- Feature engineering for government data
- Text preprocessing for contract descriptions
- Temporal feature extraction
"""
def preprocess_contracts(
self,
contracts: List[Contract]
) -> ProcessedDataset:
"""
Transform raw contract data into ML-ready features
Pipeline:
1. Data cleaning and validation
2. Missing value imputation
3. Categorical encoding
4. Numerical scaling and normalization
5. Feature engineering
6. Dimensionality reduction if needed
"""
# Extract features
features = self._extract_contract_features(contracts)
# Handle missing values
features_imputed = self._impute_missing_values(features)
# Scale numerical features
features_scaled = self._scale_features(features_imputed)
# Engineer domain-specific features
features_engineered = self._engineer_transparency_features(features_scaled)
return ProcessedDataset(
features=features_engineered,
feature_names=self._get_feature_names(),
preprocessing_metadata=self._get_preprocessing_metadata()
)
def _extract_contract_features(self, contracts: List[Contract]) -> np.ndarray:
"""Extract numerical features from contract data"""
features = []
for contract in contracts:
contract_features = [
# Financial features
float(contract.valor_inicial or 0),
float(contract.valor_global or 0),
# Temporal features
self._extract_temporal_features(contract.data_assinatura),
# Categorical features (encoded)
self._encode_modality(contract.modalidade_contratacao),
self._encode_organization(contract.orgao.codigo if contract.orgao else None),
# Text features (TF-IDF of contract object)
*self._extract_text_features(contract.objeto),
# Derived features
self._calculate_contract_duration(contract),
self._calculate_value_per_day(contract),
self._get_vendor_risk_score(contract.fornecedor),
]
features.append(contract_features)
return np.array(features)
5. Custom Cidadão.AI Model (cidadao_model.py)
Specialized Transparency Analysis Model
class CidadaoAIModel:
"""
Custom model specialized for Brazilian government transparency analysis
Architecture:
- Multi-task learning for various anomaly types
- Attention mechanisms for important features
- Interpretability through SHAP values
- Uncertainty quantification
- Brazilian government domain knowledge integration
"""
def __init__(self):
self.anomaly_detector = self._build_anomaly_detector()
self.pattern_classifier = self._build_pattern_classifier()
self.risk_scorer = self._build_risk_scorer()
self.explainer = self._build_explainer()
def _build_anomaly_detector(self) -> tf.keras.Model:
"""Build neural network for anomaly detection"""
inputs = tf.keras.Input(shape=(self.n_features,))
# Encoder
encoded = tf.keras.layers.Dense(128, activation='relu')(inputs)
encoded = tf.keras.layers.Dropout(0.2)(encoded)
encoded = tf.keras.layers.Dense(64, activation='relu')(encoded)
encoded = tf.keras.layers.Dropout(0.2)(encoded)
encoded = tf.keras.layers.Dense(32, activation='relu')(encoded)
# Decoder (autoencoder for anomaly detection)
decoded = tf.keras.layers.Dense(64, activation='relu')(encoded)
decoded = tf.keras.layers.Dense(128, activation='relu')(decoded)
decoded = tf.keras.layers.Dense(self.n_features, activation='linear')(decoded)
# Anomaly score output
anomaly_score = tf.keras.layers.Dense(1, activation='sigmoid', name='anomaly_score')(encoded)
model = tf.keras.Model(inputs=inputs, outputs=[decoded, anomaly_score])
return model
def predict_anomalies(
self,
data: np.ndarray,
return_explanations: bool = True
) -> AnomalyPrediction:
"""
Predict anomalies with explanations
Returns:
- Anomaly scores (0-1)
- Anomaly classifications
- Feature importance (SHAP values)
- Confidence intervals
"""
# Get predictions
reconstructed, anomaly_scores = self.anomaly_detector.predict(data)
# Calculate reconstruction error
reconstruction_error = np.mean((data - reconstructed) ** 2, axis=1)
# Classify anomalies
anomaly_labels = (anomaly_scores > self.anomaly_threshold).astype(int)
# Generate explanations if requested
explanations = None
if return_explanations:
explanations = self.explainer.explain_predictions(data, anomaly_scores)
return AnomalyPrediction(
anomaly_scores=anomaly_scores,
anomaly_labels=anomaly_labels,
reconstruction_error=reconstruction_error,
explanations=explanations,
confidence=self._calculate_confidence(anomaly_scores)
)
6. Model Interpretability (explainer.py)
SHAP-based Explanations
class TransparencyExplainer:
"""
Explainable AI for transparency analysis results
Methods:
- SHAP (SHapley Additive exPlanations) values
- LIME (Local Interpretable Model-agnostic Explanations)
- Feature importance analysis
- Decision boundary visualization
"""
def explain_anomaly_prediction(
self,
model: Any,
data: np.ndarray,
prediction_index: int
) -> AnomalyExplanation:
"""
Generate human-readable explanations for anomaly predictions
Returns:
- Feature contributions to the prediction
- Natural language explanation
- Visualization data for charts
- Confidence intervals
"""
# Calculate SHAP values
explainer = shap.DeepExplainer(model, data[:100]) # Background data
shap_values = explainer.shap_values(data[prediction_index:prediction_index+1])
# Get feature names and values
feature_names = self.get_feature_names()
feature_values = data[prediction_index]
# Sort by importance
importance_indices = np.argsort(np.abs(shap_values[0]))[::-1]
# Generate natural language explanation
explanation_text = self._generate_explanation_text(
shap_values[0],
feature_names,
feature_values,
importance_indices[:5] # Top 5 features
)
return AnomalyExplanation(
shap_values=shap_values[0],
feature_names=feature_names,
feature_values=feature_values,
explanation_text=explanation_text,
top_features=importance_indices[:10]
)
def _generate_explanation_text(
self,
shap_values: np.ndarray,
feature_names: List[str],
feature_values: np.ndarray,
top_indices: List[int]
) -> str:
"""Generate human-readable explanation"""
explanations = []
for idx in top_indices:
feature_name = feature_names[idx]
feature_value = feature_values[idx]
shap_value = shap_values[idx]
if shap_value > 0:
direction = "increases"
else:
direction = "decreases"
explanation = f"The {feature_name} value of {feature_value:.2f} {direction} the anomaly score by {abs(shap_value):.3f}"
explanations.append(explanation)
return ". ".join(explanations) + "."
📊 Model Training & Evaluation
Training Pipeline (training_pipeline.py)
Automated Model Training
class ModelTrainingPipeline:
"""
Automated training pipeline for transparency analysis models
Features:
- Cross-validation with time series splits
- Hyperparameter optimization
- Model selection and ensemble methods
- Performance monitoring and logging
- Automated model deployment
"""
def train_anomaly_detection_model(
self,
training_data: ProcessedDataset,
validation_split: float = 0.2,
hyperparameter_search: bool = True
) -> TrainingResult:
"""
Train anomaly detection model with optimization
Pipeline:
1. Data splitting with temporal considerations
2. Hyperparameter optimization using Optuna
3. Model training with early stopping
4. Cross-validation evaluation
5. Model interpretation and validation
"""
# Split data maintaining temporal order
train_data, val_data = self._temporal_split(training_data, validation_split)
# Hyperparameter optimization
if hyperparameter_search:
best_params = self._optimize_hyperparameters(train_data, val_data)
else:
best_params = self.default_params
# Train final model
model = self._train_model(train_data, best_params)
# Evaluate model
evaluation_results = self._evaluate_model(model, val_data)
# Generate model interpretation
interpretation = self._interpret_model(model, val_data)
return TrainingResult(
model=model,
parameters=best_params,
evaluation=evaluation_results,
interpretation=interpretation,
training_metadata=self._get_training_metadata()
)
Model Evaluation Metrics
class TransparencyMetrics:
"""
Specialized metrics for transparency analysis evaluation
Metrics:
- Precision/Recall for anomaly detection
- F1-score with class imbalance handling
- Area Under ROC Curve (AUC-ROC)
- Area Under Precision-Recall Curve (AUC-PR)
- False Positive Rate at operational thresholds
- Coverage: percentage of true anomalies detected
"""
def calculate_anomaly_detection_metrics(
self,
y_true: np.ndarray,
y_pred_proba: np.ndarray,
threshold: float = 0.5
) -> Dict[str, float]:
"""Calculate comprehensive metrics for anomaly detection"""
y_pred = (y_pred_proba > threshold).astype(int)
# Basic classification metrics
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
# ROC metrics
auc_roc = roc_auc_score(y_true, y_pred_proba)
auc_pr = average_precision_score(y_true, y_pred_proba)
# Cost-sensitive metrics
false_positive_rate = self._calculate_fpr(y_true, y_pred)
false_negative_rate = self._calculate_fnr(y_true, y_pred)
# Domain-specific metrics
coverage = self._calculate_coverage(y_true, y_pred)
efficiency = self._calculate_efficiency(y_true, y_pred)
return {
'precision': precision,
'recall': recall,
'f1_score': f1,
'auc_roc': auc_roc,
'auc_pr': auc_pr,
'false_positive_rate': false_positive_rate,
'false_negative_rate': false_negative_rate,
'coverage': coverage,
'efficiency': efficiency
}
🚀 Model Deployment
HuggingFace Integration (hf_integration.py)
Model Publishing to HuggingFace Hub
class HuggingFaceIntegration:
"""
Integration with HuggingFace Hub for model sharing and deployment
Features:
- Model uploading with metadata
- Automatic model card generation
- Version control and model registry
- Inference API integration
- Community model sharing
"""
def upload_model_to_hub(
self,
model: tf.keras.Model,
model_name: str,
description: str,
metrics: Dict[str, float]
) -> str:
"""
Upload trained model to HuggingFace Hub
Process:
1. Convert model to HuggingFace format
2. Generate model card with metrics and description
3. Package preprocessing pipelines
4. Upload to Hub with version tags
5. Set up inference API
"""
# Convert to HuggingFace format
hf_model = self._convert_to_hf_format(model)
# Generate model card
model_card = self._generate_model_card(
model_name, description, metrics
)
# Upload to hub
repo_url = hf_model.push_to_hub(
model_name,
commit_message=f"Upload {model_name} v{self.version}",
model_card=model_card
)
return repo_url
API Serving (model_api.py)
FastAPI Model Serving
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI(title="Cidadão.AI ML API")
class PredictionRequest(BaseModel):
contracts: List[Dict[str, Any]]
include_explanations: bool = True
anomaly_threshold: float = 0.5
class PredictionResponse(BaseModel):
anomalies: List[AnomalyResult]
model_version: str
processing_time_ms: float
confidence_score: float
@app.post("/predict/anomalies", response_model=PredictionResponse)
async def predict_anomalies(request: PredictionRequest):
"""
Predict anomalies in government contracts
Returns:
- Anomaly predictions with scores
- Explanations for each prediction
- Model metadata and performance metrics
"""
start_time = time.time()
# Load model (cached)
model = await get_cached_model()
# Preprocess data
processed_data = preprocess_contracts(request.contracts)
# Make predictions
predictions = model.predict_anomalies(
processed_data,
threshold=request.anomaly_threshold,
return_explanations=request.include_explanations
)
processing_time = (time.time() - start_time) * 1000
return PredictionResponse(
anomalies=predictions.anomalies,
model_version=model.version,
processing_time_ms=processing_time,
confidence_score=predictions.overall_confidence
)
📊 Performance Benchmarks
Transparency Benchmark Suite (transparency_benchmark.py)
Comprehensive Model Evaluation
class TransparencyBenchmark:
"""
Benchmark suite for transparency analysis models
Tests:
- Synthetic anomaly detection
- Real-world case study validation
- Cross-organization generalization
- Temporal stability assessment
- Interpretability quality metrics
"""
def run_comprehensive_benchmark(
self,
model: Any,
test_datasets: List[str]
) -> BenchmarkResults:
"""
Run complete benchmark suite on model
Benchmarks:
1. Synthetic data with known anomalies
2. Historical case studies with verified outcomes
3. Cross-validation across different organizations
4. Temporal robustness testing
5. Adversarial robustness evaluation
"""
results = {}
for dataset_name in test_datasets:
dataset = self._load_benchmark_dataset(dataset_name)
# Run predictions
predictions = model.predict(dataset.X)
# Calculate metrics
metrics = self._calculate_metrics(dataset.y, predictions)
# Test interpretability
interpretability_score = self._test_interpretability(
model, dataset.X[:10]
)
results[dataset_name] = {
'metrics': metrics,
'interpretability': interpretability_score,
'processing_time': self._measure_processing_time(model, dataset.X)
}
return BenchmarkResults(results)
🧪 Usage Examples
Basic Anomaly Detection
from src.ml.anomaly_detector import AnomalyDetector
from src.ml.data_pipeline import DataPipeline
# Initialize components
detector = AnomalyDetector()
pipeline = DataPipeline()
# Process contract data
contracts = fetch_contracts_from_api()
processed_data = pipeline.preprocess_contracts(contracts)
# Detect anomalies
anomalies = detector.detect_price_anomalies(
contracts,
threshold=2.5
)
for anomaly in anomalies:
print(f"Anomaly: {anomaly.description}")
print(f"Confidence: {anomaly.confidence:.2f}")
print(f"Affected contracts: {len(anomaly.affected_records)}")
Advanced Pattern Analysis
from src.ml.pattern_analyzer import PatternAnalyzer
from src.ml.spectral_analyzer import SpectralAnalyzer
# Initialize analyzers
pattern_analyzer = PatternAnalyzer()
spectral_analyzer = SpectralAnalyzer()
# Analyze spending trends
expenses = fetch_expenses_from_api(organization="20000", year=2024)
trend_analysis = pattern_analyzer.analyze_spending_trends(expenses)
print(f"Trend direction: {trend_analysis.trend_direction}")
print(f"Seasonality strength: {trend_analysis.seasonality_strength:.2f}")
print(f"Anomalous periods: {len(trend_analysis.anomalous_periods)}")
# Spectral analysis
spending_series = extract_monthly_spending(expenses)
spectral_analysis = spectral_analyzer.analyze_spending_spectrum(spending_series)
print(f"Dominant periods: {spectral_analysis.significant_periods}")
print(f"End-of-year effect: {spectral_analysis.eoy_strength:.2f}")
Custom Model Training
from src.ml.training_pipeline import ModelTrainingPipeline
from src.ml.cidadao_model import CidadaoAIModel
# Prepare training data
training_data = prepare_training_dataset()
# Initialize training pipeline
trainer = ModelTrainingPipeline()
# Train model with hyperparameter optimization
training_result = await trainer.train_anomaly_detection_model(
training_data,
hyperparameter_search=True,
cross_validation_folds=5
)
print(f"Best F1 score: {training_result.evaluation.f1_score:.3f}")
print(f"Model size: {training_result.model.count_params()} parameters")
# Deploy to HuggingFace
hf_integration = HuggingFaceIntegration()
model_url = hf_integration.upload_model_to_hub(
training_result.model,
"cidadao-ai/anomaly-detector-v1",
"Government contract anomaly detection model",
training_result.evaluation.metrics
)
print(f"Model deployed: {model_url}")
This ML pipeline provides state-of-the-art anomaly detection and pattern analysis capabilities specifically designed for Brazilian government transparency data, with full interpretability and production-ready deployment options.