Spaces:
Sleeping
Sleeping
import json | |
import asyncio | |
from typing import Dict, Any, Optional, List | |
from backend.utils import generate_completions | |
from backend import config | |
from backend.db import db | |
from backend.db_cache import api_cache | |
import logging | |
logger = logging.getLogger(__name__) | |
class ContentGenerator: | |
"""Service for generating and storing all learning content""" | |
async def generate_curriculum_from_metadata( | |
self, | |
metadata_extraction_id: str, | |
query: str, | |
metadata: Dict[str, Any], | |
user_id: Optional[int] = None | |
) -> str: | |
"""Generate curriculum based on extracted metadata""" | |
# Format curriculum instructions with metadata | |
instructions = ( | |
config.curriculum_instructions | |
.replace("{native_language}", metadata['native_language']) | |
.replace("{target_language}", metadata['target_language']) | |
.replace("{proficiency}", metadata['proficiency']) | |
) | |
# Generate curriculum | |
logger.info(f"Generating curriculum for {metadata['target_language']} ({metadata['proficiency']})") | |
curriculum_response = await generate_completions.get_completions(query, instructions) | |
try: | |
# Parse curriculum response | |
curriculum = json.loads(curriculum_response) | |
except json.JSONDecodeError: | |
logger.error(f"Failed to parse curriculum response: {curriculum_response[:200]}...") | |
curriculum = {"lesson_topic": "Language Learning Journey", "sub_topics": []} | |
# Save curriculum to database | |
curriculum_id = await db.save_curriculum( | |
metadata_extraction_id=metadata_extraction_id, | |
curriculum=curriculum, | |
user_id=user_id | |
) | |
return curriculum_id | |
async def generate_content_for_lesson( | |
self, | |
curriculum_id: str, | |
lesson_index: int, | |
lesson: Dict[str, Any], | |
metadata: Dict[str, Any] | |
) -> Dict[str, str]: | |
"""Generate all content types for a single lesson""" | |
content_ids = {} | |
lesson_topic = lesson.get('sub_topic', f'Lesson {lesson_index + 1}') | |
lesson_context = f"{lesson_topic}: {lesson.get('description', '')}" | |
# Generate flashcards | |
try: | |
flashcards_instructions = ( | |
config.flashcard_mode_instructions | |
.replace("{native_language}", metadata['native_language']) | |
.replace("{target_language}", metadata['target_language']) | |
.replace("{proficiency}", metadata['proficiency']) | |
) | |
flashcards_response = await api_cache.get_or_set( | |
category="flashcards", | |
key_text=lesson_context, | |
coro=generate_completions.get_completions, | |
context={ | |
'native_language': metadata['native_language'], | |
'target_language': metadata['target_language'], | |
'proficiency': metadata['proficiency'], | |
'lesson_index': lesson_index | |
}, | |
prompt=lesson_context, | |
instructions=flashcards_instructions | |
) | |
# Save flashcards | |
content_ids['flashcards'] = await db.save_learning_content( | |
curriculum_id=curriculum_id, | |
content_type='flashcards', | |
lesson_index=lesson_index, | |
lesson_topic=lesson_topic, | |
content=flashcards_response | |
) | |
except Exception as e: | |
logger.error(f"Failed to generate flashcards for lesson {lesson_index}: {e}") | |
# Generate exercises | |
try: | |
exercises_instructions = ( | |
config.exercise_mode_instructions | |
.replace("{native_language}", metadata['native_language']) | |
.replace("{target_language}", metadata['target_language']) | |
.replace("{proficiency}", metadata['proficiency']) | |
) | |
exercises_response = await api_cache.get_or_set( | |
category="exercises", | |
key_text=lesson_context, | |
coro=generate_completions.get_completions, | |
context={ | |
'native_language': metadata['native_language'], | |
'target_language': metadata['target_language'], | |
'proficiency': metadata['proficiency'], | |
'lesson_index': lesson_index | |
}, | |
prompt=lesson_context, | |
instructions=exercises_instructions | |
) | |
# Save exercises | |
content_ids['exercises'] = await db.save_learning_content( | |
curriculum_id=curriculum_id, | |
content_type='exercises', | |
lesson_index=lesson_index, | |
lesson_topic=lesson_topic, | |
content=exercises_response | |
) | |
except Exception as e: | |
logger.error(f"Failed to generate exercises for lesson {lesson_index}: {e}") | |
# Generate simulation | |
try: | |
simulation_instructions = ( | |
config.simulation_mode_instructions | |
.replace("{native_language}", metadata['native_language']) | |
.replace("{target_language}", metadata['target_language']) | |
.replace("{proficiency}", metadata['proficiency']) | |
) | |
simulation_response = await api_cache.get_or_set( | |
category="simulation", | |
key_text=lesson_context, | |
coro=generate_completions.get_completions, | |
context={ | |
'native_language': metadata['native_language'], | |
'target_language': metadata['target_language'], | |
'proficiency': metadata['proficiency'], | |
'lesson_index': lesson_index | |
}, | |
prompt=lesson_context, | |
instructions=simulation_instructions | |
) | |
# Save simulation | |
content_ids['simulation'] = await db.save_learning_content( | |
curriculum_id=curriculum_id, | |
content_type='simulation', | |
lesson_index=lesson_index, | |
lesson_topic=lesson_topic, | |
content=simulation_response | |
) | |
except Exception as e: | |
logger.error(f"Failed to generate simulation for lesson {lesson_index}: {e}") | |
return content | |
async def generate_all_content_for_curriculum( | |
self, | |
curriculum_id: str, | |
max_concurrent_lessons: int = 3 | |
): | |
"""Generate all learning content for a curriculum""" | |
# Get curriculum details | |
curriculum_data = await db.get_curriculum(curriculum_id) | |
if not curriculum_data: | |
logger.error(f"Curriculum not found: {curriculum_id}") | |
return | |
# Parse curriculum JSON | |
try: | |
curriculum = json.loads(curriculum_data['curriculum_json']) | |
lessons = curriculum.get('sub_topics', []) | |
except json.JSONDecodeError: | |
logger.error(f"Failed to parse curriculum JSON for {curriculum_id}") | |
return | |
# Prepare metadata | |
metadata = { | |
'native_language': curriculum_data['native_language'], | |
'target_language': curriculum_data['target_language'], | |
'proficiency': curriculum_data['proficiency'] | |
} | |
logger.info(f"Starting content generation for {len(lessons)} lessons") | |
# Process lessons in batches to avoid overwhelming the API | |
for i in range(0, len(lessons), max_concurrent_lessons): | |
batch = lessons[i:i + max_concurrent_lessons] | |
batch_indices = list(range(i, min(i + max_concurrent_lessons, len(lessons)))) | |
# Generate content for batch concurrently | |
tasks = [ | |
self.generate_content_for_lesson( | |
curriculum_id=curriculum_id, | |
lesson_index=idx, | |
lesson=lesson, | |
metadata=metadata | |
) | |
for idx, lesson in zip(batch_indices, batch) | |
] | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
for idx, result in zip(batch_indices, results): | |
if isinstance(result, Exception): | |
logger.error(f"Failed to generate content for lesson {idx}: {result}") | |
else: | |
logger.info(f"Generated content for lesson {idx}: {result}") | |
# Mark curriculum as content generated | |
await db.mark_curriculum_content_generated(curriculum_id) | |
logger.info(f"Completed content generation for curriculum {curriculum_id}") | |
async def process_metadata_extraction( | |
self, | |
extraction_id: str, | |
query: str, | |
metadata: Dict[str, Any], | |
user_id: Optional[int] = None, | |
generate_content: bool = True | |
) -> Dict[str, Any]: | |
"""Process a metadata extraction by checking for existing curriculum or generating new one""" | |
# Check for existing curriculum first | |
existing_curriculum = await db.find_existing_curriculum( | |
query=query, | |
native_language=metadata['native_language'], | |
target_language=metadata['target_language'], | |
proficiency=metadata['proficiency'], | |
user_id=user_id | |
) | |
if existing_curriculum: | |
# If we found an exact match for this user, return it | |
if existing_curriculum.get('user_id') == user_id: | |
logger.info(f"Found existing curriculum for user {user_id}: {existing_curriculum['id']}") | |
return { | |
'curriculum_id': existing_curriculum['id'], | |
'content_generation_started': False, | |
'cached': True, | |
'cache_type': 'user_exact_match' | |
} | |
# If we found a similar curriculum from another user, copy it | |
elif existing_curriculum.get('is_content_generated') == 1: | |
logger.info(f"Copying existing curriculum {existing_curriculum['id']} for user {user_id}") | |
curriculum_id = await db.copy_curriculum_for_user( | |
source_curriculum_id=existing_curriculum['id'], | |
metadata_extraction_id=extraction_id, | |
user_id=user_id | |
) | |
return { | |
'curriculum_id': curriculum_id, | |
'content_generation_started': False, | |
'cached': True, | |
'cache_type': 'copied_from_similar' | |
} | |
# No suitable existing curriculum found, generate new one | |
logger.info(f"No existing curriculum found, generating new one for user {user_id}") | |
curriculum_id = await self.generate_curriculum_from_metadata( | |
metadata_extraction_id=extraction_id, | |
query=query, | |
metadata=metadata, | |
user_id=user_id | |
) | |
result = { | |
'curriculum_id': curriculum_id, | |
'content_generation_started': False, | |
'cached': False, | |
'cache_type': 'newly_generated' | |
} | |
if generate_content: | |
# Start content generation in background | |
asyncio.create_task(self.generate_all_content_for_curriculum(curriculum_id)) | |
result['content_generation_started'] = True | |
return result | |
# Global content generator instance | |
content_generator = ContentGenerator() |