import json import asyncio from typing import Dict, Any, Optional, List from backend.utils import generate_completions from backend import config from backend.db import db from backend.db_cache import api_cache import logging logger = logging.getLogger(__name__) class ContentGenerator: """Service for generating and storing all learning content""" async def generate_curriculum_from_metadata( self, metadata_extraction_id: str, query: str, metadata: Dict[str, Any], user_id: Optional[int] = None ) -> str: """Generate curriculum based on extracted metadata""" # Format curriculum instructions with metadata instructions = ( config.curriculum_instructions .replace("{native_language}", metadata['native_language']) .replace("{target_language}", metadata['target_language']) .replace("{proficiency}", metadata['proficiency']) ) # Generate curriculum logger.info(f"Generating curriculum for {metadata['target_language']} ({metadata['proficiency']})") curriculum_response = await generate_completions.get_completions(query, instructions) try: # Parse curriculum response curriculum = json.loads(curriculum_response) except json.JSONDecodeError: logger.error(f"Failed to parse curriculum response: {curriculum_response[:200]}...") curriculum = {"lesson_topic": "Language Learning Journey", "sub_topics": []} # Save curriculum to database curriculum_id = await db.save_curriculum( metadata_extraction_id=metadata_extraction_id, curriculum=curriculum, user_id=user_id ) return curriculum_id async def generate_content_for_lesson( self, curriculum_id: str, lesson_index: int, lesson: Dict[str, Any], metadata: Dict[str, Any] ) -> Dict[str, str]: """Generate all content types for a single lesson""" content_ids = {} lesson_topic = lesson.get('sub_topic', f'Lesson {lesson_index + 1}') lesson_context = f"{lesson_topic}: {lesson.get('description', '')}" # Generate flashcards try: flashcards_instructions = ( config.flashcard_mode_instructions .replace("{native_language}", metadata['native_language']) .replace("{target_language}", metadata['target_language']) .replace("{proficiency}", metadata['proficiency']) ) flashcards_response = await api_cache.get_or_set( category="flashcards", key_text=lesson_context, coro=generate_completions.get_completions, context={ 'native_language': metadata['native_language'], 'target_language': metadata['target_language'], 'proficiency': metadata['proficiency'], 'lesson_index': lesson_index }, prompt=lesson_context, instructions=flashcards_instructions ) # Save flashcards content_ids['flashcards'] = await db.save_learning_content( curriculum_id=curriculum_id, content_type='flashcards', lesson_index=lesson_index, lesson_topic=lesson_topic, content=flashcards_response ) except Exception as e: logger.error(f"Failed to generate flashcards for lesson {lesson_index}: {e}") # Generate exercises try: exercises_instructions = ( config.exercise_mode_instructions .replace("{native_language}", metadata['native_language']) .replace("{target_language}", metadata['target_language']) .replace("{proficiency}", metadata['proficiency']) ) exercises_response = await api_cache.get_or_set( category="exercises", key_text=lesson_context, coro=generate_completions.get_completions, context={ 'native_language': metadata['native_language'], 'target_language': metadata['target_language'], 'proficiency': metadata['proficiency'], 'lesson_index': lesson_index }, prompt=lesson_context, instructions=exercises_instructions ) # Save exercises content_ids['exercises'] = await db.save_learning_content( curriculum_id=curriculum_id, content_type='exercises', lesson_index=lesson_index, lesson_topic=lesson_topic, content=exercises_response ) except Exception as e: logger.error(f"Failed to generate exercises for lesson {lesson_index}: {e}") # Generate simulation try: simulation_instructions = ( config.simulation_mode_instructions .replace("{native_language}", metadata['native_language']) .replace("{target_language}", metadata['target_language']) .replace("{proficiency}", metadata['proficiency']) ) simulation_response = await api_cache.get_or_set( category="simulation", key_text=lesson_context, coro=generate_completions.get_completions, context={ 'native_language': metadata['native_language'], 'target_language': metadata['target_language'], 'proficiency': metadata['proficiency'], 'lesson_index': lesson_index }, prompt=lesson_context, instructions=simulation_instructions ) # Save simulation content_ids['simulation'] = await db.save_learning_content( curriculum_id=curriculum_id, content_type='simulation', lesson_index=lesson_index, lesson_topic=lesson_topic, content=simulation_response ) except Exception as e: logger.error(f"Failed to generate simulation for lesson {lesson_index}: {e}") return content async def generate_all_content_for_curriculum( self, curriculum_id: str, max_concurrent_lessons: int = 3 ): """Generate all learning content for a curriculum""" # Get curriculum details curriculum_data = await db.get_curriculum(curriculum_id) if not curriculum_data: logger.error(f"Curriculum not found: {curriculum_id}") return # Parse curriculum JSON try: curriculum = json.loads(curriculum_data['curriculum_json']) lessons = curriculum.get('sub_topics', []) except json.JSONDecodeError: logger.error(f"Failed to parse curriculum JSON for {curriculum_id}") return # Prepare metadata metadata = { 'native_language': curriculum_data['native_language'], 'target_language': curriculum_data['target_language'], 'proficiency': curriculum_data['proficiency'] } logger.info(f"Starting content generation for {len(lessons)} lessons") # Process lessons in batches to avoid overwhelming the API for i in range(0, len(lessons), max_concurrent_lessons): batch = lessons[i:i + max_concurrent_lessons] batch_indices = list(range(i, min(i + max_concurrent_lessons, len(lessons)))) # Generate content for batch concurrently tasks = [ self.generate_content_for_lesson( curriculum_id=curriculum_id, lesson_index=idx, lesson=lesson, metadata=metadata ) for idx, lesson in zip(batch_indices, batch) ] results = await asyncio.gather(*tasks, return_exceptions=True) for idx, result in zip(batch_indices, results): if isinstance(result, Exception): logger.error(f"Failed to generate content for lesson {idx}: {result}") else: logger.info(f"Generated content for lesson {idx}: {result}") # Mark curriculum as content generated await db.mark_curriculum_content_generated(curriculum_id) logger.info(f"Completed content generation for curriculum {curriculum_id}") async def process_metadata_extraction( self, extraction_id: str, query: str, metadata: Dict[str, Any], user_id: Optional[int] = None, generate_content: bool = True ) -> Dict[str, Any]: """Process a metadata extraction by checking for existing curriculum or generating new one""" # Check for existing curriculum first existing_curriculum = await db.find_existing_curriculum( query=query, native_language=metadata['native_language'], target_language=metadata['target_language'], proficiency=metadata['proficiency'], user_id=user_id ) if existing_curriculum: # If we found an exact match for this user, return it if existing_curriculum.get('user_id') == user_id: logger.info(f"Found existing curriculum for user {user_id}: {existing_curriculum['id']}") return { 'curriculum_id': existing_curriculum['id'], 'content_generation_started': False, 'cached': True, 'cache_type': 'user_exact_match' } # If we found a similar curriculum from another user, copy it elif existing_curriculum.get('is_content_generated') == 1: logger.info(f"Copying existing curriculum {existing_curriculum['id']} for user {user_id}") curriculum_id = await db.copy_curriculum_for_user( source_curriculum_id=existing_curriculum['id'], metadata_extraction_id=extraction_id, user_id=user_id ) return { 'curriculum_id': curriculum_id, 'content_generation_started': False, 'cached': True, 'cache_type': 'copied_from_similar' } # No suitable existing curriculum found, generate new one logger.info(f"No existing curriculum found, generating new one for user {user_id}") curriculum_id = await self.generate_curriculum_from_metadata( metadata_extraction_id=extraction_id, query=query, metadata=metadata, user_id=user_id ) result = { 'curriculum_id': curriculum_id, 'content_generation_started': False, 'cached': False, 'cache_type': 'newly_generated' } if generate_content: # Start content generation in background asyncio.create_task(self.generate_all_content_for_curriculum(curriculum_id)) result['content_generation_started'] = True return result # Global content generator instance content_generator = ContentGenerator()