File size: 6,696 Bytes
74708f4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
import asyncio
from datetime import datetime, timedelta, timezone
from typing import Dict, Optional
from models import TranscriptionResult, TranscriptionStatus
from config import settings
import logging
# Configure logging for this module
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class InMemoryStorage:
def __init__(self):
self._storage: Dict[int, TranscriptionResult] = {}
self._next_id = 1
self._cleanup_task = None
async def start_cleanup_task(self):
"""Start the background cleanup task"""
if self._cleanup_task is None:
logger.info("π§Ή Starting automatic cleanup task")
logger.info(f"β° Cleanup interval: {settings.CLEANUP_INTERVAL_HOURS} hours")
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
else:
logger.info("π§Ή Cleanup task already running")
async def stop_cleanup_task(self):
"""Stop the background cleanup task"""
if self._cleanup_task:
logger.info("π Stopping cleanup task")
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
self._cleanup_task = None
logger.info("β
Cleanup task stopped")
else:
logger.info("π§Ή No cleanup task to stop")
def create_transcription(self, language: Optional[str] = None) -> int:
"""Create a new transcription entry and return its ID"""
transcription_id = self._next_id
self._next_id += 1
logger.info(f"π Creating new transcription entry with ID: {transcription_id}")
logger.info(f"π Language: {language or 'auto-detect'}")
result = TranscriptionResult(
id=transcription_id,
status=TranscriptionStatus.PENDING,
language=language,
created_at=datetime.now(timezone.utc)
)
self._storage[transcription_id] = result
logger.info(f"β
Transcription {transcription_id} created successfully")
logger.info(f"π Total active transcriptions: {len(self._storage)}")
return transcription_id
def get_transcription(self, transcription_id: int) -> Optional[TranscriptionResult]:
"""Get transcription by ID"""
logger.info(f"π Looking up transcription ID: {transcription_id}")
result = self._storage.get(transcription_id)
if result:
logger.info(f"β
Found transcription {transcription_id} with status: {result.status}")
else:
logger.warning(f"β Transcription {transcription_id} not found")
return result
def update_transcription(self, transcription_id: int, **kwargs) -> bool:
"""Update transcription fields"""
if transcription_id not in self._storage:
logger.warning(f"β Cannot update transcription {transcription_id} - not found")
return False
result = self._storage[transcription_id]
old_status = result.status if hasattr(result, 'status') else 'unknown'
for key, value in kwargs.items():
if hasattr(result, key):
setattr(result, key, value)
new_status = result.status if hasattr(result, 'status') else 'unknown'
logger.info(f"π Updated transcription {transcription_id}")
if 'status' in kwargs:
logger.info(f"π Status changed: {old_status} β {new_status}")
# Log specific updates
for key, value in kwargs.items():
if key == 'text' and value:
text_preview = value[:50] + "..." if len(value) > 50 else value
logger.info(f"π Text updated: {text_preview}")
elif key == 'error_message' and value:
logger.error(f"β Error recorded: {value}")
elif key not in ['status', 'text', 'error_message']:
logger.info(f"π {key}: {value}")
return True
def delete_transcription(self, transcription_id: int) -> bool:
"""Delete transcription by ID"""
if transcription_id in self._storage:
result = self._storage[transcription_id]
del self._storage[transcription_id]
logger.info(f"ποΈ Deleted transcription {transcription_id} (status: {result.status})")
logger.info(f"π Remaining transcriptions: {len(self._storage)}")
return True
else:
logger.warning(f"β Cannot delete transcription {transcription_id} - not found")
return False
async def _cleanup_loop(self):
"""Background task to clean up old transcriptions"""
logger.info("π§Ή Cleanup loop started")
while True:
try:
logger.info("π΄ Cleanup sleeping for 1 hour...")
await asyncio.sleep(3600) # Check every hour
logger.info("β° Running scheduled cleanup...")
await self._cleanup_old_transcriptions()
except asyncio.CancelledError:
logger.info("π Cleanup loop cancelled")
break
except Exception as e:
logger.error(f"β Error in cleanup loop: {e}")
async def _cleanup_old_transcriptions(self):
"""Remove transcriptions older than the configured time"""
logger.info("π§Ή Starting cleanup of old transcriptions...")
cutoff_time = datetime.now(timezone.utc) - timedelta(hours=settings.CLEANUP_INTERVAL_HOURS)
logger.info(f"β° Cutoff time: {cutoff_time} (older than {settings.CLEANUP_INTERVAL_HOURS} hours)")
to_delete = []
for transcription_id, result in self._storage.items():
age_hours = (datetime.now(timezone.utc) - result.created_at).total_seconds() / 3600
if result.created_at < cutoff_time:
logger.info(f"ποΈ Marking transcription {transcription_id} for deletion (age: {age_hours:.1f} hours)")
to_delete.append(transcription_id)
if not to_delete:
logger.info("β
No old transcriptions to clean up")
return
logger.info(f"π§Ή Deleting {len(to_delete)} old transcriptions...")
for transcription_id in to_delete:
self.delete_transcription(transcription_id)
logger.info(f"β
Cleanup completed - removed {len(to_delete)} transcriptions")
logger.info(f"π Active transcriptions remaining: {len(self._storage)}")
# Global storage instance
storage = InMemoryStorage()
|