import pandas as pd import numpy as np import httpx from datetime import datetime, timedelta from . import cache, config import asyncpraw import json import os from pathlib import Path import pytz # This part you added is kept, as it's a good fallback LOCAL_DATA_DIR = Path(os.getenv( "TRADE_LOCAL_DATA", "./local_data" # Simplified default for portability )) LOCAL_DATA_DIR.mkdir(exist_ok=True) def _process_finnhub_data(data: dict, symbol: str) -> pd.DataFrame: """Processes JSON from Finnhub into a clean DataFrame.""" if not data or data.get('s') != 'ok' or 'c' not in data: print(f"No valid data received from Finnhub for {symbol}.") return pd.DataFrame() df = pd.DataFrame({ 'Open': data['o'], 'High': data['h'], 'Low': data['l'], 'Close': data['c'], 'Volume': data['v'] }) # Finnhub timestamps are UNIX timestamps df.index = pd.to_datetime(data['t'], unit='s', utc=True) df.dropna(inplace=True) return df class UnifiedDataProvider: def __init__(self): # Create a single, reusable client for all API calls self.client = httpx.AsyncClient(timeout=20.0) self.reddit_instance = None self.local_data_cache = {} print("✅ UnifiedDataProvider initialized with Finnhub.") def _load_local_data(self, symbol: str) -> dict | None: """Loads symbol JSON from local files if present.""" path = LOCAL_DATA_DIR / f"{symbol.upper()}_external_data.json" if path.exists(): print(f"Loading local data from: {path}") with open(path, "r") as f: data = json.load(f) self.local_data_cache[symbol.upper()] = data return data return None async def fetch_multi_timeframe_stock_data(self, symbol: str) -> dict: """ REWRITTEN FOR FREE TIER: Fetches the current day's quote data from Finnhub. Historical candle data is a premium feature. """ print(f"Fetching Finnhub REAL-TIME QUOTE for {symbol}...") dfs = {} base_url = "https://finnhub.io/api/v1/quote" params = {"symbol": symbol, "token": config.FINNHUB_KEY} try: res = await self.client.get(base_url, params=params) res.raise_for_status() data = res.json() if data and 'c' in data and data['c'] != 0: # Create a single-row DataFrame df = pd.DataFrame([{ 'Open': data['o'], 'High': data['h'], 'Low': data['l'], 'Close': data['c'], # Volume is not in the quote endpoint, so we put a placeholder 'Volume': 0 }]) # Use the current time for the index df.index = pd.to_datetime([datetime.now(tz=pytz.UTC)]) # Since we only have one row, we can't do multi-timeframe. # We will return this single row for the 'daily' key. dfs["daily"] = df print(f" - Fetched real-time quote for {symbol} from Finnhub.") else: dfs["daily"] = pd.DataFrame() except (httpx.RequestError, httpx.HTTPStatusError) as e: print(f"ERROR: Finnhub Quote request failed for {symbol}: {e}") dfs["daily"] = pd.DataFrame() return dfs async def fetch_news(self, symbol: str, days: int = 3) -> tuple: """Fetches news from Finnhub, with local file fallback.""" local_data = self._load_local_data(symbol) if local_data and 'news_data' in local_data: return local_data['news_data'], "local_file" cache_key = f"news_{symbol}_{days}" cached_data = cache.get(cache_key) if cached_data: return cached_data, "cache" end_date = datetime.now() start_date = end_date - timedelta(days=days) url = f"https://finnhub.io/api/v1/company-news" params = { "symbol": symbol, "from": start_date.strftime('%Y-%m-%d'), "to": end_date.strftime('%Y-%m-%d'), "token": config.FINNHUB_KEY } try: res = await self.client.get(url, params=params) res.raise_for_status() data = res.json() cache.put(cache_key, data) return data, "api" except (httpx.RequestError, httpx.HTTPStatusError) as e: print(f"ERROR: Finnhub news request failed for {symbol}: {e}") return [], "error" def _get_reddit_instance(self): """Initializes the asyncpraw Reddit instance.""" if self.reddit_instance is None: self.reddit_instance = asyncpraw.Reddit( client_id=config.REDDIT_CLIENT_ID, client_secret=config.REDDIT_CLIENT_SECRET, user_agent=config.REDDIT_USER_AGENT ) return self.reddit_instance async def fetch_reddit_data(self, symbol: str, limit: int = 25) -> tuple: """Fetches Reddit data, with local file fallback.""" local_data = self._load_local_data(symbol) if local_data and 'reddit_data' in local_data: return local_data['reddit_data'], "local_file" cache_key = f"reddit_{symbol}_{limit}" cached_data = cache.get(cache_key) if cached_data: return cached_data, "cache" reddit = self._get_reddit_instance() submissions_data = [] query = f'"{symbol}" OR "${symbol}"' subreddits = ["stocks", "wallstreetbets", "options"] for sub_name in subreddits: try: subreddit = await reddit.subreddit(sub_name) async for submission in subreddit.search(query, limit=limit, sort='new'): submissions_data.append({ 'title': submission.title, 'score': submission.score, 'url': submission.url, 'created_utc': submission.created_utc, 'subreddit': sub_name }) except Exception as e: print(f"ERROR: Reddit fetch for {symbol} in {sub_name} failed: {e}") cache.put(cache_key, submissions_data) return submissions_data, "api" def get_alternative_data(self, symbol: str) -> dict: """ Gets VIX and sector data from Finnhub. NOTE: Put/Call and IV are hardcoded as they require a separate options data provider. """ # This function is not async because Finnhub's free plan is slow, # and running these sequentially is more stable than parallel async calls. vix_level = 20.0 sector = "Unknown" try: # Finnhub uses .VIX for the index, but we use a synchronous httpx client here with httpx.Client() as sync_client: # Get VIX vix_url = f"https://finnhub.io/api/v1/quote?symbol=^VIX&token={config.FINNHUB_KEY}" vix_res = sync_client.get(vix_url) if vix_res.status_code == 200: vix_level = vix_res.json().get('c', 20.0) # Get Company Profile for Sector profile_url = f"https://finnhub.io/api/v1/stock/profile2?symbol={symbol}&token={config.FINNHUB_KEY}" profile_res = sync_client.get(profile_url) if profile_res.status_code == 200: sector = profile_res.json().get('finnhubIndustry', 'Unknown') except Exception as e: print(f"ERROR fetching alternative data for {symbol}: {e}") return { "vix_level": round(vix_level, 2), "sector": sector, "put_call_ratio": 0.85, # Hardcoded: Requires options data provider "iv_rank": 45.5 # Hardcoded: Requires options data provider } async def close(self): """Closes all persistent connections.""" if self.reddit_instance: await self.reddit_instance.close() print("Reddit instance closed.") await self.client.aclose() print("HTTPX client closed.")