|
|
""" |
|
|
Module: ml.spectral_analyzer |
|
|
Description: Spectral analysis using Fourier transforms for government transparency data |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-07-19 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from typing import Dict, List, Optional, Tuple, Any |
|
|
from dataclasses import dataclass |
|
|
from datetime import datetime, timedelta |
|
|
from scipy.fft import fft, fftfreq, ifft, rfft, rfftfreq |
|
|
from scipy.signal import find_peaks, welch, periodogram, spectrogram |
|
|
from scipy.stats import zscore |
|
|
import warnings |
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
from src.core import get_logger |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SpectralFeatures: |
|
|
"""Spectral characteristics of a time series.""" |
|
|
|
|
|
dominant_frequencies: List[float] |
|
|
dominant_periods: List[float] |
|
|
spectral_entropy: float |
|
|
power_spectrum: np.ndarray |
|
|
frequencies: np.ndarray |
|
|
peak_frequencies: List[float] |
|
|
seasonal_components: Dict[str, float] |
|
|
anomaly_score: float |
|
|
trend_component: np.ndarray |
|
|
residual_component: np.ndarray |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SpectralAnomaly: |
|
|
"""Spectral anomaly detection result.""" |
|
|
|
|
|
timestamp: datetime |
|
|
anomaly_type: str |
|
|
severity: str |
|
|
frequency_band: Tuple[float, float] |
|
|
anomaly_score: float |
|
|
description: str |
|
|
evidence: Dict[str, Any] |
|
|
recommendations: List[str] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class PeriodicPattern: |
|
|
"""Detected periodic pattern in spending data.""" |
|
|
|
|
|
period_days: float |
|
|
frequency_hz: float |
|
|
amplitude: float |
|
|
confidence: float |
|
|
pattern_type: str |
|
|
business_interpretation: str |
|
|
statistical_significance: float |
|
|
|
|
|
|
|
|
class SpectralAnalyzer: |
|
|
""" |
|
|
Advanced spectral analysis for government transparency data using Fourier transforms. |
|
|
|
|
|
Capabilities: |
|
|
- Seasonal pattern detection in public spending |
|
|
- Cyclical anomaly identification |
|
|
- Frequency-domain correlation analysis |
|
|
- Spectral anomaly detection |
|
|
- Periodic pattern classification |
|
|
- Cross-spectral analysis between entities |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
sampling_frequency: float = 1.0, |
|
|
anomaly_threshold: float = 2.5, |
|
|
min_period_days: int = 7, |
|
|
max_period_days: int = 365, |
|
|
): |
|
|
""" |
|
|
Initialize the Spectral Analyzer. |
|
|
|
|
|
Args: |
|
|
sampling_frequency: Sampling frequency in Hz (1.0 = daily) |
|
|
anomaly_threshold: Z-score threshold for anomaly detection |
|
|
min_period_days: Minimum period in days for pattern detection |
|
|
max_period_days: Maximum period in days for pattern detection |
|
|
""" |
|
|
self.fs = sampling_frequency |
|
|
self.anomaly_threshold = anomaly_threshold |
|
|
self.min_period = min_period_days |
|
|
self.max_period = max_period_days |
|
|
self.logger = logger |
|
|
|
|
|
|
|
|
self.frequency_bands = { |
|
|
"daily": (1/1, 1/3), |
|
|
"weekly": (1/7, 1/10), |
|
|
"biweekly": (1/14, 1/21), |
|
|
"monthly": (1/30, 1/45), |
|
|
"quarterly": (1/90, 1/120), |
|
|
"semester": (1/180, 1/200), |
|
|
"annual": (1/365, 1/400), |
|
|
"suspicious": (1/2, 1/5) |
|
|
} |
|
|
|
|
|
def analyze_time_series( |
|
|
self, |
|
|
data: pd.Series, |
|
|
timestamps: Optional[pd.DatetimeIndex] = None |
|
|
) -> SpectralFeatures: |
|
|
""" |
|
|
Perform comprehensive spectral analysis of a time series. |
|
|
|
|
|
Args: |
|
|
data: Time series data (spending amounts, contract counts, etc.) |
|
|
timestamps: Optional datetime index |
|
|
|
|
|
Returns: |
|
|
SpectralFeatures object with complete spectral characteristics |
|
|
""" |
|
|
try: |
|
|
|
|
|
if timestamps is None: |
|
|
timestamps = pd.date_range(start='2020-01-01', periods=len(data), freq='D') |
|
|
|
|
|
|
|
|
data_clean = self._preprocess_data(data) |
|
|
|
|
|
|
|
|
fft_values = rfft(data_clean) |
|
|
frequencies = rfftfreq(len(data_clean), d=1/self.fs) |
|
|
|
|
|
|
|
|
power_spectrum = np.abs(fft_values) ** 2 |
|
|
|
|
|
|
|
|
dominant_freqs, dominant_periods = self._find_dominant_frequencies( |
|
|
frequencies, power_spectrum |
|
|
) |
|
|
|
|
|
|
|
|
spectral_entropy = self._calculate_spectral_entropy(power_spectrum) |
|
|
|
|
|
|
|
|
peak_frequencies = self._find_peak_frequencies(frequencies, power_spectrum) |
|
|
|
|
|
|
|
|
seasonal_components = self._detect_seasonal_components( |
|
|
frequencies, power_spectrum |
|
|
) |
|
|
|
|
|
|
|
|
trend, residual = self._decompose_signal(data_clean) |
|
|
|
|
|
|
|
|
anomaly_score = self._calculate_spectral_anomaly_score( |
|
|
power_spectrum, frequencies |
|
|
) |
|
|
|
|
|
return SpectralFeatures( |
|
|
dominant_frequencies=dominant_freqs, |
|
|
dominant_periods=dominant_periods, |
|
|
spectral_entropy=spectral_entropy, |
|
|
power_spectrum=power_spectrum, |
|
|
frequencies=frequencies, |
|
|
peak_frequencies=peak_frequencies, |
|
|
seasonal_components=seasonal_components, |
|
|
anomaly_score=anomaly_score, |
|
|
trend_component=trend, |
|
|
residual_component=residual |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Error in spectral analysis: {str(e)}") |
|
|
raise |
|
|
|
|
|
def detect_anomalies( |
|
|
self, |
|
|
data: pd.Series, |
|
|
timestamps: pd.DatetimeIndex, |
|
|
context: Optional[Dict[str, Any]] = None |
|
|
) -> List[SpectralAnomaly]: |
|
|
""" |
|
|
Detect anomalies using spectral analysis techniques. |
|
|
|
|
|
Args: |
|
|
data: Time series data |
|
|
timestamps: Datetime index |
|
|
context: Additional context (entity name, spending category, etc.) |
|
|
|
|
|
Returns: |
|
|
List of detected spectral anomalies |
|
|
""" |
|
|
anomalies = [] |
|
|
|
|
|
try: |
|
|
|
|
|
features = self.analyze_time_series(data, timestamps) |
|
|
|
|
|
|
|
|
freq_anomalies = self._detect_frequency_anomalies(features) |
|
|
anomalies.extend(freq_anomalies) |
|
|
|
|
|
|
|
|
spectral_change_anomalies = self._detect_spectral_changes(data, timestamps) |
|
|
anomalies.extend(spectral_change_anomalies) |
|
|
|
|
|
|
|
|
suspicious_patterns = self._detect_suspicious_patterns(features, context) |
|
|
anomalies.extend(suspicious_patterns) |
|
|
|
|
|
|
|
|
noise_anomalies = self._detect_high_frequency_noise(features) |
|
|
anomalies.extend(noise_anomalies) |
|
|
|
|
|
|
|
|
anomalies.sort(key=lambda x: ( |
|
|
{"critical": 4, "high": 3, "medium": 2, "low": 1}[x.severity], |
|
|
x.timestamp |
|
|
), reverse=True) |
|
|
|
|
|
return anomalies |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Error detecting spectral anomalies: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def find_periodic_patterns( |
|
|
self, |
|
|
data: pd.Series, |
|
|
timestamps: pd.DatetimeIndex, |
|
|
entity_name: Optional[str] = None |
|
|
) -> List[PeriodicPattern]: |
|
|
""" |
|
|
Find and classify periodic patterns in spending data. |
|
|
|
|
|
Args: |
|
|
data: Time series data |
|
|
timestamps: Datetime index |
|
|
entity_name: Name of the entity being analyzed |
|
|
|
|
|
Returns: |
|
|
List of detected periodic patterns |
|
|
""" |
|
|
patterns = [] |
|
|
|
|
|
try: |
|
|
features = self.analyze_time_series(data, timestamps) |
|
|
|
|
|
|
|
|
for band_name, (min_freq, max_freq) in self.frequency_bands.items(): |
|
|
pattern = self._analyze_frequency_band( |
|
|
features, band_name, min_freq, max_freq, entity_name |
|
|
) |
|
|
if pattern: |
|
|
patterns.append(pattern) |
|
|
|
|
|
|
|
|
patterns.sort(key=lambda x: x.amplitude, reverse=True) |
|
|
|
|
|
return patterns |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Error finding periodic patterns: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def cross_spectral_analysis( |
|
|
self, |
|
|
data1: pd.Series, |
|
|
data2: pd.Series, |
|
|
entity1_name: str, |
|
|
entity2_name: str, |
|
|
timestamps: Optional[pd.DatetimeIndex] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Perform cross-spectral analysis between two entities. |
|
|
|
|
|
Args: |
|
|
data1: First time series |
|
|
data2: Second time series |
|
|
entity1_name: Name of first entity |
|
|
entity2_name: Name of second entity |
|
|
timestamps: Datetime index |
|
|
|
|
|
Returns: |
|
|
Cross-spectral analysis results |
|
|
""" |
|
|
try: |
|
|
|
|
|
min_len = min(len(data1), len(data2)) |
|
|
data1_clean = self._preprocess_data(data1[:min_len]) |
|
|
data2_clean = self._preprocess_data(data2[:min_len]) |
|
|
|
|
|
|
|
|
fft1 = rfft(data1_clean) |
|
|
fft2 = rfft(data2_clean) |
|
|
cross_spectrum = fft1 * np.conj(fft2) |
|
|
|
|
|
frequencies = rfftfreq(min_len, d=1/self.fs) |
|
|
|
|
|
|
|
|
coherence = np.abs(cross_spectrum) ** 2 / ( |
|
|
(np.abs(fft1) ** 2) * (np.abs(fft2) ** 2) |
|
|
) |
|
|
|
|
|
|
|
|
phase_diff = np.angle(cross_spectrum) |
|
|
|
|
|
|
|
|
high_coherence_indices = np.where(coherence > 0.7)[0] |
|
|
correlated_frequencies = frequencies[high_coherence_indices] |
|
|
correlated_periods = 1 / correlated_frequencies[correlated_frequencies > 0] |
|
|
|
|
|
|
|
|
correlation_coeff = np.corrcoef(data1_clean, data2_clean)[0, 1] |
|
|
|
|
|
return { |
|
|
"entities": [entity1_name, entity2_name], |
|
|
"correlation_coefficient": correlation_coeff, |
|
|
"coherence_spectrum": coherence, |
|
|
"phase_spectrum": phase_diff, |
|
|
"frequencies": frequencies, |
|
|
"correlated_frequencies": correlated_frequencies.tolist(), |
|
|
"correlated_periods_days": correlated_periods.tolist(), |
|
|
"max_coherence": np.max(coherence), |
|
|
"mean_coherence": np.mean(coherence), |
|
|
"synchronization_score": self._calculate_synchronization_score(coherence), |
|
|
"business_interpretation": self._interpret_cross_spectral_results( |
|
|
correlation_coeff, coherence, correlated_periods, |
|
|
entity1_name, entity2_name |
|
|
) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Error in cross-spectral analysis: {str(e)}") |
|
|
return {} |
|
|
|
|
|
def _preprocess_data(self, data: pd.Series) -> np.ndarray: |
|
|
"""Preprocess time series data for spectral analysis.""" |
|
|
|
|
|
data_numeric = pd.to_numeric(data, errors='coerce') |
|
|
|
|
|
|
|
|
data_filled = data_numeric.interpolate(method='linear') |
|
|
|
|
|
|
|
|
data_filled = data_filled.fillna(data_filled.median()) |
|
|
|
|
|
|
|
|
data_detrended = data_filled - data_filled.rolling(window=30, center=True).mean().fillna(data_filled.mean()) |
|
|
|
|
|
|
|
|
window = np.hanning(len(data_detrended)) |
|
|
data_windowed = data_detrended * window |
|
|
|
|
|
return data_windowed.values |
|
|
|
|
|
def _find_dominant_frequencies( |
|
|
self, |
|
|
frequencies: np.ndarray, |
|
|
power_spectrum: np.ndarray |
|
|
) -> Tuple[List[float], List[float]]: |
|
|
"""Find dominant frequencies in the power spectrum.""" |
|
|
|
|
|
peaks, properties = find_peaks( |
|
|
power_spectrum, |
|
|
height=np.mean(power_spectrum) + 2*np.std(power_spectrum), |
|
|
distance=5 |
|
|
) |
|
|
|
|
|
|
|
|
dominant_freqs = frequencies[peaks].tolist() |
|
|
dominant_periods = [1/f if f > 0 else np.inf for f in dominant_freqs] |
|
|
|
|
|
|
|
|
peak_powers = power_spectrum[peaks] |
|
|
sorted_indices = np.argsort(peak_powers)[::-1] |
|
|
|
|
|
dominant_freqs = [dominant_freqs[i] for i in sorted_indices] |
|
|
dominant_periods = [dominant_periods[i] for i in sorted_indices] |
|
|
|
|
|
return dominant_freqs[:10], dominant_periods[:10] |
|
|
|
|
|
def _calculate_spectral_entropy(self, power_spectrum: np.ndarray) -> float: |
|
|
"""Calculate spectral entropy as a measure of spectral complexity.""" |
|
|
|
|
|
normalized_spectrum = power_spectrum / np.sum(power_spectrum) |
|
|
|
|
|
|
|
|
normalized_spectrum = normalized_spectrum[normalized_spectrum > 0] |
|
|
|
|
|
|
|
|
entropy = -np.sum(normalized_spectrum * np.log2(normalized_spectrum)) |
|
|
|
|
|
|
|
|
max_entropy = np.log2(len(normalized_spectrum)) |
|
|
|
|
|
return entropy / max_entropy if max_entropy > 0 else 0 |
|
|
|
|
|
def _find_peak_frequencies( |
|
|
self, |
|
|
frequencies: np.ndarray, |
|
|
power_spectrum: np.ndarray |
|
|
) -> List[float]: |
|
|
"""Find significant peak frequencies.""" |
|
|
|
|
|
threshold = np.mean(power_spectrum) + np.std(power_spectrum) |
|
|
|
|
|
peaks, _ = find_peaks(power_spectrum, height=threshold) |
|
|
peak_frequencies = frequencies[peaks] |
|
|
|
|
|
|
|
|
relevant_peaks = peak_frequencies[ |
|
|
(peak_frequencies >= 1/self.max_period) & |
|
|
(peak_frequencies <= 1/self.min_period) |
|
|
] |
|
|
|
|
|
return relevant_peaks.tolist() |
|
|
|
|
|
def _detect_seasonal_components( |
|
|
self, |
|
|
frequencies: np.ndarray, |
|
|
power_spectrum: np.ndarray |
|
|
) -> Dict[str, float]: |
|
|
"""Detect seasonal components in the spectrum.""" |
|
|
seasonal_components = {} |
|
|
|
|
|
|
|
|
seasonal_freqs = { |
|
|
"weekly": 1/7, |
|
|
"monthly": 1/30, |
|
|
"quarterly": 1/91, |
|
|
"biannual": 1/182, |
|
|
"annual": 1/365 |
|
|
} |
|
|
|
|
|
for component, target_freq in seasonal_freqs.items(): |
|
|
|
|
|
freq_idx = np.argmin(np.abs(frequencies - target_freq)) |
|
|
|
|
|
if freq_idx < len(power_spectrum): |
|
|
|
|
|
window_size = max(1, len(frequencies) // 50) |
|
|
start_idx = max(0, freq_idx - window_size//2) |
|
|
end_idx = min(len(power_spectrum), freq_idx + window_size//2) |
|
|
|
|
|
component_power = np.mean(power_spectrum[start_idx:end_idx]) |
|
|
total_power = np.mean(power_spectrum) |
|
|
|
|
|
seasonal_components[component] = component_power / total_power if total_power > 0 else 0 |
|
|
|
|
|
return seasonal_components |
|
|
|
|
|
def _decompose_signal(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""Decompose signal into trend and residual components.""" |
|
|
|
|
|
window_size = min(30, len(data) // 4) |
|
|
trend = np.convolve(data, np.ones(window_size)/window_size, mode='same') |
|
|
|
|
|
|
|
|
residual = data - trend |
|
|
|
|
|
return trend, residual |
|
|
|
|
|
def _calculate_spectral_anomaly_score( |
|
|
self, |
|
|
power_spectrum: np.ndarray, |
|
|
frequencies: np.ndarray |
|
|
) -> float: |
|
|
"""Calculate overall anomaly score based on spectral characteristics.""" |
|
|
|
|
|
entropy = self._calculate_spectral_entropy(power_spectrum) |
|
|
entropy_score = 1 - entropy |
|
|
|
|
|
|
|
|
high_freq_mask = frequencies > 1/self.min_period |
|
|
high_freq_power = np.sum(power_spectrum[high_freq_mask]) |
|
|
total_power = np.sum(power_spectrum) |
|
|
high_freq_ratio = high_freq_power / total_power if total_power > 0 else 0 |
|
|
|
|
|
|
|
|
peak_indices, _ = find_peaks(power_spectrum) |
|
|
if len(peak_indices) > 0: |
|
|
peak_concentration = np.sum(power_spectrum[peak_indices]) / total_power |
|
|
else: |
|
|
peak_concentration = 0 |
|
|
|
|
|
|
|
|
anomaly_score = ( |
|
|
0.4 * entropy_score + |
|
|
0.3 * high_freq_ratio + |
|
|
0.3 * peak_concentration |
|
|
) |
|
|
|
|
|
return min(anomaly_score, 1.0) |
|
|
|
|
|
def _detect_frequency_anomalies(self, features: SpectralFeatures) -> List[SpectralAnomaly]: |
|
|
"""Detect anomalies in frequency domain.""" |
|
|
anomalies = [] |
|
|
|
|
|
|
|
|
for freq in features.dominant_frequencies: |
|
|
if freq > 0: |
|
|
period_days = 1 / freq |
|
|
|
|
|
|
|
|
if period_days < 3: |
|
|
anomalies.append(SpectralAnomaly( |
|
|
timestamp=datetime.now(), |
|
|
anomaly_type="high_frequency_pattern", |
|
|
severity="high", |
|
|
frequency_band=(freq * 0.9, freq * 1.1), |
|
|
anomaly_score=0.8, |
|
|
description=f"Suspicious high-frequency pattern detected (period: {period_days:.1f} days)", |
|
|
evidence={"frequency_hz": freq, "period_days": period_days}, |
|
|
recommendations=[ |
|
|
"Investigate potential data manipulation", |
|
|
"Check for automated/systematic processes", |
|
|
"Verify data source integrity" |
|
|
] |
|
|
)) |
|
|
|
|
|
return anomalies |
|
|
|
|
|
def _detect_spectral_changes( |
|
|
self, |
|
|
data: pd.Series, |
|
|
timestamps: pd.DatetimeIndex |
|
|
) -> List[SpectralAnomaly]: |
|
|
"""Detect sudden changes in spectral characteristics.""" |
|
|
anomalies = [] |
|
|
|
|
|
if len(data) < 60: |
|
|
return anomalies |
|
|
|
|
|
|
|
|
segment_size = len(data) // 4 |
|
|
segments = [data[i:i+segment_size] for i in range(0, len(data)-segment_size, segment_size)] |
|
|
|
|
|
|
|
|
entropies = [] |
|
|
for segment in segments: |
|
|
if len(segment) > 10: |
|
|
features = self.analyze_time_series(segment) |
|
|
entropies.append(features.spectral_entropy) |
|
|
|
|
|
if len(entropies) > 1: |
|
|
entropy_changes = np.diff(entropies) |
|
|
|
|
|
|
|
|
for i, change in enumerate(entropy_changes): |
|
|
if abs(change) > 0.3: |
|
|
timestamp = timestamps[i * segment_size] if i * segment_size < len(timestamps) else datetime.now() |
|
|
|
|
|
anomalies.append(SpectralAnomaly( |
|
|
timestamp=timestamp, |
|
|
anomaly_type="spectral_regime_change", |
|
|
severity="medium", |
|
|
frequency_band=(0, 0.5), |
|
|
anomaly_score=abs(change), |
|
|
description=f"Significant change in spending pattern complexity detected", |
|
|
evidence={"entropy_change": change, "segment": i}, |
|
|
recommendations=[ |
|
|
"Investigate policy or procedural changes", |
|
|
"Check for organizational restructuring", |
|
|
"Verify data consistency" |
|
|
] |
|
|
)) |
|
|
|
|
|
return anomalies |
|
|
|
|
|
def _detect_suspicious_patterns( |
|
|
self, |
|
|
features: SpectralFeatures, |
|
|
context: Optional[Dict[str, Any]] |
|
|
) -> List[SpectralAnomaly]: |
|
|
"""Detect patterns that might indicate irregular activities.""" |
|
|
anomalies = [] |
|
|
|
|
|
|
|
|
seasonal = features.seasonal_components |
|
|
|
|
|
|
|
|
if seasonal.get("quarterly", 0) > 0.4: |
|
|
anomalies.append(SpectralAnomaly( |
|
|
timestamp=datetime.now(), |
|
|
anomaly_type="excessive_quarterly_pattern", |
|
|
severity="medium", |
|
|
frequency_band=(1/120, 1/60), |
|
|
anomaly_score=seasonal["quarterly"], |
|
|
description="Excessive quarterly spending pattern detected", |
|
|
evidence={"quarterly_component": seasonal["quarterly"]}, |
|
|
recommendations=[ |
|
|
"Investigate budget execution practices", |
|
|
"Check for end-of-quarter rushing", |
|
|
"Review budget planning processes" |
|
|
] |
|
|
)) |
|
|
|
|
|
|
|
|
if seasonal.get("weekly", 0) > 0.3: |
|
|
anomalies.append(SpectralAnomaly( |
|
|
timestamp=datetime.now(), |
|
|
anomaly_type="unusual_weekly_regularity", |
|
|
severity="low", |
|
|
frequency_band=(1/10, 1/5), |
|
|
anomaly_score=seasonal["weekly"], |
|
|
description="Unusually regular weekly spending pattern", |
|
|
evidence={"weekly_component": seasonal["weekly"]}, |
|
|
recommendations=[ |
|
|
"Verify if pattern matches business processes", |
|
|
"Check for automated payments", |
|
|
"Review spending authorization patterns" |
|
|
] |
|
|
)) |
|
|
|
|
|
return anomalies |
|
|
|
|
|
def _detect_high_frequency_noise(self, features: SpectralFeatures) -> List[SpectralAnomaly]: |
|
|
"""Detect high-frequency noise that might indicate data manipulation.""" |
|
|
anomalies = [] |
|
|
|
|
|
|
|
|
high_freq_mask = features.frequencies > 0.2 |
|
|
high_freq_power = np.sum(features.power_spectrum[high_freq_mask]) |
|
|
total_power = np.sum(features.power_spectrum) |
|
|
|
|
|
high_freq_ratio = high_freq_power / total_power if total_power > 0 else 0 |
|
|
|
|
|
if high_freq_ratio > 0.3: |
|
|
anomalies.append(SpectralAnomaly( |
|
|
timestamp=datetime.now(), |
|
|
anomaly_type="high_frequency_noise", |
|
|
severity="medium", |
|
|
frequency_band=(0.2, np.max(features.frequencies)), |
|
|
anomaly_score=high_freq_ratio, |
|
|
description="High-frequency noise detected in spending data", |
|
|
evidence={"high_freq_ratio": high_freq_ratio}, |
|
|
recommendations=[ |
|
|
"Check data collection processes", |
|
|
"Investigate potential data manipulation", |
|
|
"Verify data source reliability" |
|
|
] |
|
|
)) |
|
|
|
|
|
return anomalies |
|
|
|
|
|
def _analyze_frequency_band( |
|
|
self, |
|
|
features: SpectralFeatures, |
|
|
band_name: str, |
|
|
min_freq: float, |
|
|
max_freq: float, |
|
|
entity_name: Optional[str] |
|
|
) -> Optional[PeriodicPattern]: |
|
|
"""Analyze specific frequency band for patterns.""" |
|
|
|
|
|
mask = (features.frequencies >= min_freq) & (features.frequencies <= max_freq) |
|
|
|
|
|
if not np.any(mask): |
|
|
return None |
|
|
|
|
|
band_power = features.power_spectrum[mask] |
|
|
band_frequencies = features.frequencies[mask] |
|
|
|
|
|
if len(band_power) == 0: |
|
|
return None |
|
|
|
|
|
|
|
|
max_idx = np.argmax(band_power) |
|
|
peak_frequency = band_frequencies[max_idx] |
|
|
peak_power = band_power[max_idx] |
|
|
|
|
|
|
|
|
total_power = np.sum(features.power_spectrum) |
|
|
relative_amplitude = peak_power / total_power if total_power > 0 else 0 |
|
|
|
|
|
|
|
|
if relative_amplitude < 0.05: |
|
|
return None |
|
|
|
|
|
|
|
|
mean_power = np.mean(band_power) |
|
|
confidence = (peak_power - mean_power) / mean_power if mean_power > 0 else 0 |
|
|
confidence = min(confidence / 3, 1.0) |
|
|
|
|
|
|
|
|
period_days = 1 / peak_frequency if peak_frequency > 0 else 0 |
|
|
pattern_type = self._classify_pattern_type(band_name, period_days, relative_amplitude) |
|
|
business_interpretation = self._interpret_pattern( |
|
|
band_name, period_days, relative_amplitude, entity_name |
|
|
) |
|
|
|
|
|
return PeriodicPattern( |
|
|
period_days=period_days, |
|
|
frequency_hz=peak_frequency, |
|
|
amplitude=relative_amplitude, |
|
|
confidence=confidence, |
|
|
pattern_type=pattern_type, |
|
|
business_interpretation=business_interpretation, |
|
|
statistical_significance=confidence |
|
|
) |
|
|
|
|
|
def _classify_pattern_type( |
|
|
self, |
|
|
band_name: str, |
|
|
period_days: float, |
|
|
amplitude: float |
|
|
) -> str: |
|
|
"""Classify the type of periodic pattern.""" |
|
|
if band_name in ["weekly", "monthly", "quarterly", "annual"]: |
|
|
if amplitude > 0.2: |
|
|
return "seasonal" |
|
|
else: |
|
|
return "cyclical" |
|
|
elif band_name == "suspicious" or period_days < 3: |
|
|
return "suspicious" |
|
|
else: |
|
|
return "irregular" |
|
|
|
|
|
def _interpret_pattern( |
|
|
self, |
|
|
band_name: str, |
|
|
period_days: float, |
|
|
amplitude: float, |
|
|
entity_name: Optional[str] |
|
|
) -> str: |
|
|
"""Provide business interpretation of detected pattern.""" |
|
|
entity_str = f" for {entity_name}" if entity_name else "" |
|
|
|
|
|
interpretations = { |
|
|
"weekly": f"Weekly spending cycle detected{entity_str} (period: {period_days:.1f} days, strength: {amplitude:.1%})", |
|
|
"monthly": f"Monthly budget cycle identified{entity_str} (period: {period_days:.1f} days, strength: {amplitude:.1%})", |
|
|
"quarterly": f"Quarterly spending pattern found{entity_str} (period: {period_days:.1f} days, strength: {amplitude:.1%})", |
|
|
"annual": f"Annual budget cycle detected{entity_str} (period: {period_days:.1f} days, strength: {amplitude:.1%})", |
|
|
"suspicious": f"Potentially suspicious high-frequency pattern{entity_str} (period: {period_days:.1f} days)" |
|
|
} |
|
|
|
|
|
return interpretations.get(band_name, f"Periodic pattern detected{entity_str} (period: {period_days:.1f} days)") |
|
|
|
|
|
def _calculate_synchronization_score(self, coherence: np.ndarray) -> float: |
|
|
"""Calculate synchronization score between two entities.""" |
|
|
|
|
|
weights = np.exp(-np.linspace(0, 5, len(coherence))) |
|
|
weighted_coherence = coherence * weights |
|
|
|
|
|
return np.mean(weighted_coherence) |
|
|
|
|
|
def _interpret_cross_spectral_results( |
|
|
self, |
|
|
correlation: float, |
|
|
coherence: np.ndarray, |
|
|
correlated_periods: List[float], |
|
|
entity1: str, |
|
|
entity2: str |
|
|
) -> str: |
|
|
"""Interpret cross-spectral analysis results.""" |
|
|
if correlation > 0.7: |
|
|
correlation_strength = "strong" |
|
|
elif correlation > 0.4: |
|
|
correlation_strength = "moderate" |
|
|
else: |
|
|
correlation_strength = "weak" |
|
|
|
|
|
interpretation = f"{correlation_strength.capitalize()} correlation detected between {entity1} and {entity2} (r={correlation:.3f}). " |
|
|
|
|
|
if len(correlated_periods) > 0: |
|
|
main_periods = [p for p in correlated_periods if 7 <= p <= 365] |
|
|
if main_periods: |
|
|
interpretation += f"Synchronized patterns found at periods: {', '.join([f'{p:.0f} days' for p in main_periods[:3]])}." |
|
|
|
|
|
max_coherence = np.max(coherence) |
|
|
if max_coherence > 0.8: |
|
|
interpretation += " High spectral coherence suggests systematic coordination or shared external factors." |
|
|
elif max_coherence > 0.6: |
|
|
interpretation += " Moderate spectral coherence indicates some shared patterns or influences." |
|
|
|
|
|
return interpretation |