import asyncio from datetime import datetime, timedelta, timezone from typing import Dict, Optional from models import TranscriptionResult, TranscriptionStatus from config import settings import logging # Configure logging for this module logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class InMemoryStorage: def __init__(self): self._storage: Dict[int, TranscriptionResult] = {} self._next_id = 1 self._cleanup_task = None async def start_cleanup_task(self): """Start the background cleanup task""" if self._cleanup_task is None: logger.info("๐Ÿงน Starting automatic cleanup task") logger.info(f"โฐ Cleanup interval: {settings.CLEANUP_INTERVAL_HOURS} hours") self._cleanup_task = asyncio.create_task(self._cleanup_loop()) else: logger.info("๐Ÿงน Cleanup task already running") async def stop_cleanup_task(self): """Stop the background cleanup task""" if self._cleanup_task: logger.info("๐Ÿ›‘ Stopping cleanup task") self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass self._cleanup_task = None logger.info("โœ… Cleanup task stopped") else: logger.info("๐Ÿงน No cleanup task to stop") def create_transcription(self, language: Optional[str] = None) -> int: """Create a new transcription entry and return its ID""" transcription_id = self._next_id self._next_id += 1 logger.info(f"๐Ÿ“ Creating new transcription entry with ID: {transcription_id}") logger.info(f"๐ŸŒ Language: {language or 'auto-detect'}") result = TranscriptionResult( id=transcription_id, status=TranscriptionStatus.PENDING, language=language, created_at=datetime.now(timezone.utc) ) self._storage[transcription_id] = result logger.info(f"โœ… Transcription {transcription_id} created successfully") logger.info(f"๐Ÿ“Š Total active transcriptions: {len(self._storage)}") return transcription_id def get_transcription(self, transcription_id: int) -> Optional[TranscriptionResult]: """Get transcription by ID""" logger.info(f"๐Ÿ” Looking up transcription ID: {transcription_id}") result = self._storage.get(transcription_id) if result: logger.info(f"โœ… Found transcription {transcription_id} with status: {result.status}") else: logger.warning(f"โŒ Transcription {transcription_id} not found") return result def update_transcription(self, transcription_id: int, **kwargs) -> bool: """Update transcription fields""" if transcription_id not in self._storage: logger.warning(f"โŒ Cannot update transcription {transcription_id} - not found") return False result = self._storage[transcription_id] old_status = result.status if hasattr(result, 'status') else 'unknown' for key, value in kwargs.items(): if hasattr(result, key): setattr(result, key, value) new_status = result.status if hasattr(result, 'status') else 'unknown' logger.info(f"๐Ÿ“ Updated transcription {transcription_id}") if 'status' in kwargs: logger.info(f"๐Ÿ”„ Status changed: {old_status} โ†’ {new_status}") # Log specific updates for key, value in kwargs.items(): if key == 'text' and value: text_preview = value[:50] + "..." if len(value) > 50 else value logger.info(f"๐Ÿ“„ Text updated: {text_preview}") elif key == 'error_message' and value: logger.error(f"โŒ Error recorded: {value}") elif key not in ['status', 'text', 'error_message']: logger.info(f"๐Ÿ“Š {key}: {value}") return True def delete_transcription(self, transcription_id: int) -> bool: """Delete transcription by ID""" if transcription_id in self._storage: result = self._storage[transcription_id] del self._storage[transcription_id] logger.info(f"๐Ÿ—‘๏ธ Deleted transcription {transcription_id} (status: {result.status})") logger.info(f"๐Ÿ“Š Remaining transcriptions: {len(self._storage)}") return True else: logger.warning(f"โŒ Cannot delete transcription {transcription_id} - not found") return False async def _cleanup_loop(self): """Background task to clean up old transcriptions""" logger.info("๐Ÿงน Cleanup loop started") while True: try: logger.info("๐Ÿ˜ด Cleanup sleeping for 1 hour...") await asyncio.sleep(3600) # Check every hour logger.info("โฐ Running scheduled cleanup...") await self._cleanup_old_transcriptions() except asyncio.CancelledError: logger.info("๐Ÿ›‘ Cleanup loop cancelled") break except Exception as e: logger.error(f"โŒ Error in cleanup loop: {e}") async def _cleanup_old_transcriptions(self): """Remove transcriptions older than the configured time""" logger.info("๐Ÿงน Starting cleanup of old transcriptions...") cutoff_time = datetime.now(timezone.utc) - timedelta(hours=settings.CLEANUP_INTERVAL_HOURS) logger.info(f"โฐ Cutoff time: {cutoff_time} (older than {settings.CLEANUP_INTERVAL_HOURS} hours)") to_delete = [] for transcription_id, result in self._storage.items(): age_hours = (datetime.now(timezone.utc) - result.created_at).total_seconds() / 3600 if result.created_at < cutoff_time: logger.info(f"๐Ÿ—‘๏ธ Marking transcription {transcription_id} for deletion (age: {age_hours:.1f} hours)") to_delete.append(transcription_id) if not to_delete: logger.info("โœ… No old transcriptions to clean up") return logger.info(f"๐Ÿงน Deleting {len(to_delete)} old transcriptions...") for transcription_id in to_delete: self.delete_transcription(transcription_id) logger.info(f"โœ… Cleanup completed - removed {len(to_delete)} transcriptions") logger.info(f"๐Ÿ“Š Active transcriptions remaining: {len(self._storage)}") # Global storage instance storage = InMemoryStorage()