# trade_analysis/enhanced_api.py import os from fastapi import FastAPI, Query, HTTPException from pydantic import BaseModel import httpx from typing import Dict, Any, List import pandas as pd import numpy as np import asyncio from datetime import datetime from pathlib import Path # Import only modules that still exist from .data import UnifiedDataProvider from .indicators import enrich_with_indicators, identify_current_setup from .enhanced_sentiment import EnhancedFinancialSentimentAnalyzer, analyze_momentum_sentiment from .momentum_trading_engine import IntegratedMomentumEngine from .enhanced_llm import EnhancedLLMEngine, generate_enhanced_llm_signal from .tft_model import GapPredictionTFT from .agent import TradingAgent, analyze_agent_performance # Global dictionary to store TFT models api_tft_models = {} trading_agent = None def sanitize_for_json(data: any) -> any: """Recursively converts numpy and pandas types to JSON-serializable types.""" if isinstance(data, dict): return {key: sanitize_for_json(value) for key, value in data.items()} elif isinstance(data, list): return [sanitize_for_json(item) for item in data] elif isinstance(data, np.bool_): return bool(data) elif isinstance(data, (np.integer, np.int64)): return int(data) elif isinstance(data, np.floating): return float(data) elif isinstance(data, pd.Timestamp): return data.isoformat() elif isinstance(data, (pd.Series, pd.Index, np.ndarray)): return data.tolist() return data class EnhancedSignalResponse(BaseModel): """Enhanced response model with momentum and LLM analysis""" symbol: str signal: str confidence: float reasoning: str position_size: float status: str details: Dict[str, Any] # Enhanced fields momentum_analysis: Dict[str, Any] = {} llm_ensemble: Dict[str, Any] = {} options_strategy: Dict[str, Any] = {} timeframe_recommendation: str = "15m" expected_hold_time: str = "Unknown" # Enhanced FastAPI App app = FastAPI( title="Enhanced Intraday Momentum Engine", version="2.0.0", description="SOTA Financial AI with multi-LLM ensemble and momentum analysis" ) # Initialize enhanced components data_provider = UnifiedDataProvider() sentiment_analyzer = EnhancedFinancialSentimentAnalyzer() momentum_engine = IntegratedMomentumEngine() llm_engine = EnhancedLLMEngine() tft_predictor = GapPredictionTFT(context_length=96, prediction_length=1) @app.on_event("startup") async def startup_event(): """Initialize all AI models on startup and launch the agent.""" print("🚀 Starting Enhanced Trading Engine...") # --- This new logic checks the environment before loading models --- from .deploy import DeploymentConfig config = DeploymentConfig.auto_detect() # Load sentiment models regardless of environment print("📊 Loading sentiment models...") sentiment_analyzer.initialize_models() # Only load LLMs if we are NOT on a CPU if config.device != "cpu": print("🧠 Loading LLM ensemble...") llm_engine.initialize_llm_models() else: print("🚫 CPU environment detected. Skipping LLM loading.") # Load TFT models print("🤖 Loading TFT models...") # (Your existing TFT model loading logic here, ensure it writes to /tmp if needed) symbols = ['QQQ', 'SPY', 'MSFT', 'TSLA', 'NVDA', 'META'] for symbol in symbols: model_path = f"/tmp/tft_{symbol}_validated.pth" # Use /tmp for models tft_instance = GapPredictionTFT() # (The rest of your TFT loading logic...) api_tft_models[symbol] = tft_instance # Initialize and run the agent as a background task global trading_agent trading_agent = TradingAgent(api_url="http://localhost:7860") print("🤖 Launching Trading Agent as a background task...") asyncio.create_task(trading_agent.run()) print("✅ Enhanced Trading Engine startup complete!") @app.get("/") def read_root(): """Enhanced root endpoint with system info""" import torch gpu_info = "CPU only" if torch.cuda.is_available(): gpu_name = torch.cuda.get_device_name(0) gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9 gpu_info = f"{gpu_name} ({gpu_memory:.1f} GB)" return { "status": "operational", "engine": "Enhanced Intraday Momentum Engine v2.0.0", "gpu_info": gpu_info, "features": [ "Multi-LLM Ensemble Analysis", "Advanced Sentiment Analysis (10+ models)", "High-Frequency Momentum Engine", "Options Strategy Generation", "TFT Gap Prediction", "Autonomous Trading Agent" ], "timestamp": datetime.now().isoformat() } @app.post("/predict/enhanced/", response_model=EnhancedSignalResponse) async def predict_enhanced_signal( symbol: str = Query(..., description="Stock symbol (e.g., QQQ, SPY)"), timeframe: str = Query("5m", description="Trading timeframe: 1m, 5m, 15m, 1h"), strategy_mode: str = Query("momentum", description="Strategy: momentum, scalp, gap, swing") ): """ Enhanced prediction endpoint with full AI stack """ try: start_time = datetime.now() # Fetch market data async with httpx.AsyncClient() as client: print(f"📈 Fetching data for {symbol}...") # Multi-timeframe OHLCV data ohlcv_data = await data_provider.fetch_multi_timeframe_stock_data(symbol) # News and social data - FIXED SYNTAX news_data, _ = await data_provider.fetch_news(symbol, client) reddit_data, _ = await data_provider.fetch_reddit_data(symbol) # Alternative data alt_data = data_provider.get_alternative_data(symbol) # Process dataframes news_df = pd.DataFrame(news_data) if news_data else pd.DataFrame() reddit_df = pd.DataFrame(reddit_data) if reddit_data else pd.DataFrame() # Technical analysis for each timeframe tech_setups = {} for tf, df in ohlcv_data.items(): if not df.empty: enriched_df = enrich_with_indicators(df.copy(), tf) tech_setups[tf] = identify_current_setup(enriched_df, tf) print("🔄 Running AI analysis...") # 1. Enhanced Sentiment Analysis sentiment_analysis = await asyncio.get_event_loop().run_in_executor( None, analyze_momentum_sentiment, news_df, reddit_df, symbol, timeframe ) # 2. Momentum Analysis momentum_analysis = momentum_engine.generate_enhanced_signal( ohlcv_data, sentiment_analysis, alt_data ) # 3. TFT Prediction daily_df = ohlcv_data.get("daily") tft_prediction = None tft_model = api_tft_models.get(symbol.upper()) if daily_df is not None and len(daily_df) >= 96 and tft_model: if tft_model.is_trained: tft_prediction = tft_model.predict_gap_probability(daily_df) print(f"🚀 Using pretrained TFT model for {symbol}") else: print(f"🤖 Training TFT model for {symbol}...") tft_model.train(daily_df, epochs=20) tft_prediction = tft_model.predict_gap_probability(daily_df) else: if tft_model: tft_prediction = tft_model._default_prediction() else: temp_tft = GapPredictionTFT() tft_prediction = temp_tft._default_prediction() # 4. LLM Ensemble Analysis llm_analysis = {} try: llm_analysis = llm_engine.generate_enhanced_trading_signal( ohlcv_data, sentiment_analysis, momentum_analysis, alt_data ) except Exception as e: print(f"LLM analysis failed: {e}") conditions = { "is_vix_high": alt_data.get('vix_level', 0) > 25, "is_15m_rsi_bullish": tech_setups.get("15m", {}).get('rsi', 50) > 65, "is_15m_rsi_bearish": tech_setups.get("15m", {}).get('rsi', 50) < 35, "is_15m_volume_spike": tech_setups.get("15m", {}).get('volume_spike', False), "is_hourly_trend_bullish": tech_setups.get("hourly", {}).get('direction') == 'up', "is_hourly_trend_bearish": tech_setups.get("hourly", {}).get('direction') == 'down' } llm_analysis = generate_enhanced_llm_signal(conditions) # 5. Master Signal Generation - FIXED FUNCTION NAME master_signal = _generate_master_signal( momentum_analysis, llm_analysis, sentiment_analysis, tft_prediction, timeframe, strategy_mode ) # 6. Options Strategy - FIXED FUNCTION NAME options_strategy = _generate_options_strategy( master_signal, momentum_analysis, alt_data, timeframe, strategy_mode ) # Calculate processing time processing_time = (datetime.now() - start_time).total_seconds() # Prepare response sanitized_details = sanitize_for_json({ "tech_setups": tech_setups, "sentiment": sentiment_analysis, "alternative_data": alt_data, "tft_prediction": tft_prediction, "processing_time_seconds": processing_time, "data_sources": { "news_articles": len(news_df), "social_posts": len(reddit_df), "timeframes_analyzed": list(ohlcv_data.keys()) } }) return EnhancedSignalResponse( symbol=symbol, signal=master_signal["signal"], confidence=master_signal["confidence"], reasoning=master_signal["reasoning"], position_size=master_signal["position_size"], status="Success", details=sanitized_details, momentum_analysis=sanitize_for_json(momentum_analysis), llm_ensemble=sanitize_for_json(llm_analysis), options_strategy=sanitize_for_json(options_strategy), timeframe_recommendation=master_signal.get("timeframe", timeframe), expected_hold_time=master_signal.get("hold_time", "Unknown") ) except Exception as e: import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=f"Enhanced analysis failed: {e}") @app.get("/agent/start") async def start_agent(): """Start the autonomous agent""" if trading_agent: asyncio.create_task(trading_agent.run()) return {"status": "Agent started"} return {"status": "Agent not initialized"} @app.get("/agent/stats") async def get_agent_stats(): """Get agent's performance stats""" if trading_agent: return trading_agent.get_stats() return {"error": "Agent not initialized"} @app.get("/agent/positions") async def get_agent_positions(): """Get current positions""" if trading_agent: return {"positions": trading_agent.positions} return {"positions": {}} @app.get("/agent/analyze") async def analyze_agent(): """Analyze agent's performance""" try: analyze_agent_performance() return {"status": "Analysis complete - check console output"} except Exception as e: return {"error": str(e)} def _generate_master_signal(momentum_analysis: Dict, llm_analysis: Dict, sentiment_analysis: Dict, tft_prediction: Dict, timeframe: str = "15m", strategy_mode: str = "momentum") -> Dict: """Generate master trading signal from all analyses - FIXED VERSION""" # Extract signals momentum_signal = momentum_analysis.get("signal", "HOLD") momentum_confidence = momentum_analysis.get("confidence", 50) # Use the actual momentum analysis results momentum_master = momentum_analysis.get("momentum_analysis", {}).get("master_signal", {}) momentum_strategy = momentum_master.get("strategy", "WAIT") llm_signal = llm_analysis.get("signal", "HOLD") sentiment_composite = sentiment_analysis.get("composite_score", 0) tft_direction = tft_prediction.get("expected_direction", "FLAT") if tft_prediction else "FLAT" # TIMEFRAME-SPECIFIC THRESHOLDS timeframe_configs = { "1m": { "threshold": 0.2, "min_confidence": 70, "hold_time": "1-2 minutes", "position_multiplier": 0.5 }, "5m": { "threshold": 0.25, "min_confidence": 65, "hold_time": "2-5 minutes", "position_multiplier": 0.7 }, "15m": { "threshold": 0.3, "min_confidence": 60, "hold_time": "10-30 minutes", "position_multiplier": 1.0 }, "1h": { "threshold": 0.35, "min_confidence": 55, "hold_time": "30-60 minutes", "position_multiplier": 1.2 } } config = timeframe_configs.get(timeframe, timeframe_configs["15m"]) # STRATEGY MODE ADJUSTMENTS if strategy_mode == "scalp": config["threshold"] *= 0.8 config["hold_time"] = "1-3 minutes" elif strategy_mode == "gap" and tft_prediction: if tft_direction != "FLAT" and tft_prediction.get("gap_probability", 50) > 70: config["min_confidence"] -= 10 # Calculate weighted score if momentum_strategy in ["AGGRESSIVE_SCALP", "STANDARD_MOMENTUM"]: weighted_score = momentum_master.get("conviction", 0) weighted_confidence = momentum_confidence else: weights = { "momentum": 0.4, "llm": 0.25, "sentiment": 0.2, "tft": 0.15 } signal_scores = {} signal_scores["momentum"] = 1.0 if momentum_signal == "CALLS" else -1.0 if momentum_signal == "PUTS" else 0.0 signal_scores["llm"] = 1.0 if llm_signal == "CALLS" else -1.0 if llm_signal == "PUTS" else 0.0 signal_scores["sentiment"] = np.clip(sentiment_composite, -1, 1) signal_scores["tft"] = 0.7 if tft_direction == "UP" else -0.7 if tft_direction == "DOWN" else 0.0 weighted_score = sum(signal_scores[k] * weights[k] for k in weights) weighted_confidence = (momentum_confidence * 0.4 + llm_analysis.get("conviction", 50) * 0.3 + (sentiment_analysis.get("confidence", "LOW") == "HIGH") * 80 * 0.3) # Generate final signal if weighted_score > config["threshold"] and weighted_confidence > config["min_confidence"]: final_signal = "CALLS" position_size = min(0.5, (weighted_confidence / 100) * config["position_multiplier"]) elif weighted_score < -config["threshold"] and weighted_confidence > config["min_confidence"]: final_signal = "PUTS" position_size = min(0.5, (weighted_confidence / 100) * config["position_multiplier"]) else: final_signal = "HOLD" position_size = 0.0 config["hold_time"] = "Wait for better setup" # Build reasoning reasoning = [] reasoning.append(f"{strategy_mode.upper()} {timeframe}: {final_signal}") reasoning.append(f"Confidence: {weighted_confidence:.0f}%") if momentum_strategy != "WAIT": reasoning.append(f"Momentum: {momentum_strategy}") if abs(sentiment_composite) > 0.3: reasoning.append(f"Sentiment: {'Bullish' if sentiment_composite > 0 else 'Bearish'}") if tft_prediction and tft_prediction.get("gap_probability", 50) > 70: reasoning.append(f"Gap probability: {tft_prediction['gap_probability']:.0f}%") return { "signal": final_signal, "confidence": int(weighted_confidence), "reasoning": ". ".join(reasoning), "position_size": position_size, "timeframe": timeframe, "hold_time": config["hold_time"], "weighted_score": weighted_score, "strategy_mode": strategy_mode, "momentum_strategy": momentum_strategy } def _generate_options_strategy(master_signal: Dict, momentum_analysis: Dict, alt_data: Dict, timeframe: str = "15m", strategy_mode: str = "momentum") -> Dict: """Generate options strategy with timeframe awareness""" signal = master_signal["signal"] confidence = master_signal["confidence"] vix_level = alt_data.get("vix_level", 20) if signal == "HOLD": return { "strategy": "WAIT", "reasoning": "No clear directional bias", "contracts": [], "risk_management": "Wait for better setup" } # TIMEFRAME-SPECIFIC STRATEGIES if timeframe in ["1m", "5m"] and strategy_mode == "scalp": strategy = { "strategy": "0DTE_SCALP", "reasoning": f"{timeframe} scalp: {signal} with {confidence}% confidence", "contracts": [ { "type": "CALL" if signal == "CALLS" else "PUT", "strike": "ATM", "quantity": min(int(confidence / 8), 15), "dte": 0, "target_profit": 20, "stop_loss": 10 } ], "max_hold_time": f"{timeframe} bars (max 5 minutes)", "risk_management": "Ultra-tight stops, quick exits" } elif timeframe == "15m" and confidence > 70: strategy = { "strategy": "MOMENTUM_15M", "reasoning": f"15-minute momentum {signal} play, {confidence}% confidence", "contracts": [ { "type": "CALL" if signal == "CALLS" else "PUT", "strike": "1% ITM", "quantity": min(int(confidence / 12), 8), "dte": 1, "target_profit": 40, "stop_loss": 20 } ], "max_hold_time": "30 minutes", "risk_management": "Standard momentum stops" } elif timeframe == "1h": strategy = { "strategy": "HOURLY_SWING", "reasoning": f"Hourly swing {signal}, {confidence}% confidence", "contracts": [ { "type": "CALL_SPREAD" if signal == "CALLS" else "PUT_SPREAD", "long_strike": "ATM", "short_strike": "3% OTM", "quantity": min(int(confidence / 15), 5), "dte": 3, "target_profit": 35, "stop_loss": 25 } ], "max_hold_time": "2-4 hours", "risk_management": "Defined risk spreads" } else: strategy = { "strategy": "CONSERVATIVE", "reasoning": f"Lower conviction {signal}, using conservative approach", "contracts": [ { "type": "CALL_SPREAD" if signal == "CALLS" else "PUT_SPREAD", "long_strike": "ATM", "short_strike": "5% OTM", "quantity": 3, "dte": 7, "target_profit": 25, "stop_loss": 20 } ], "max_hold_time": "End of day", "risk_management": "Limited risk, defined reward" } # VIX adjustments if vix_level > 30: strategy["reasoning"] += f". High VIX ({vix_level}) - reduced size" for contract in strategy["contracts"]: contract["quantity"] = max(1, contract["quantity"] // 2) return strategy @app.post("/backtest/enhanced/") async def enhanced_backtest( symbol: str = Query(..., description="Stock symbol"), start_date: str = Query(..., description="Start date (YYYY-MM-DD)"), end_date: str = Query(..., description="End date (YYYY-MM-DD)"), strategy_mode: str = Query("momentum", description="Strategy mode"), initial_capital: float = Query(100000, description="Initial capital") ): """Enhanced backtesting with momentum strategies""" try: return { "status": "success", "message": "Enhanced backtesting ready", "features": [ "Multi-timeframe momentum analysis", "LLM ensemble signal validation", "Options strategy backtesting", "Risk-adjusted performance metrics", "Slippage and commission modeling" ] } except Exception as e: return {"status": "error", "message": str(e)} @app.get("/health/detailed") async def detailed_health_check(): """Detailed system health check""" import torch health_status = { "timestamp": datetime.now().isoformat(), "overall_status": "healthy", "components": {} } # Check GPU if torch.cuda.is_available(): gpu_memory_used = torch.cuda.memory_allocated(0) / 1e9 gpu_memory_total = torch.cuda.get_device_properties(0).total_memory / 1e9 health_status["components"]["gpu"] = { "status": "available", "device": torch.cuda.get_device_name(0), "memory_used_gb": gpu_memory_used, "memory_total_gb": gpu_memory_total, "utilization": f"{gpu_memory_used/gpu_memory_total*100:.1f}%" } else: health_status["components"]["gpu"] = {"status": "not_available"} # Check model status health_status["components"]["sentiment_models"] = { "loaded": len(sentiment_analyzer.models), "status": "ready" if sentiment_analyzer.models else "not_loaded" } health_status["components"]["llm_models"] = { "loaded": len(llm_engine.models), "status": "ready" if llm_engine.models else "not_loaded" } health_status["components"]["tft_model"] = { "status": "trained" if tft_predictor.is_trained else "not_trained" } health_status["components"]["agent"] = { "status": "initialized" if trading_agent else "not_initialized" } return health_status if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)