AI-Digital-Library-Assistant / services /podcast_generator_service.py
Nihal2000's picture
Update services/podcast_generator_service.py
ac06f3e verified
import logging
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
import re
import uuid
try:
from elevenlabs import VoiceSettings
from elevenlabs.client import ElevenLabs
ELEVENLABS_AVAILABLE = True
except ImportError:
ELEVENLABS_AVAILABLE = False
import config
from services.llamaindex_service import LlamaIndexService
from services.llm_service import LLMService
from services.document_store_service import DocumentStoreService
logger = logging.getLogger(__name__)
@dataclass
class DocumentAnalysis:
"""Analysis results from document(s)"""
key_insights: List[str]
topics: List[str]
complexity_level: str
estimated_words: int
source_documents: List[str]
summary: str
@dataclass
class DialogueLine:
"""Single line of podcast dialogue"""
speaker: str
text: str
pause_after: float = 0.5
@dataclass
class PodcastScript:
"""Complete podcast script"""
dialogue: List[DialogueLine]
total_duration_estimate: float
word_count: int
style: str
def to_text(self) -> str:
lines = []
for line in self.dialogue:
lines.append(f"{line.speaker}: {line.text}")
return "\n\n".join(lines)
@dataclass
class PodcastMetadata:
"""Metadata for generated podcast"""
podcast_id: str
title: str
description: str
source_documents: List[str]
style: str
duration_seconds: float
file_size_mb: float
voices: Dict[str, str]
generated_at: str
generation_cost: Dict[str, float]
key_topics: List[str]
@dataclass
class PodcastResult:
"""Complete podcast generation result"""
podcast_id: str
audio_file_path: str
transcript: str
metadata: PodcastMetadata
generation_time: float
success: bool
error: Optional[str] = None
class PodcastGeneratorService:
"""
Service for generating conversational podcasts from documents.
"""
WORDS_PER_MINUTE = 150
SCRIPT_PROMPTS = {
"conversational": """You are an expert podcast script writer. Create an engaging 2-host podcast discussing the provided documents.
DOCUMENT CONTENT:
{document_content}
KEY INSIGHTS:
{key_insights}
REQUIREMENTS:
- Duration: {duration_minutes} minutes (approximately {word_count} words)
- Style: Conversational, friendly, and accessible
- Format: Alternating dialogue between HOST1 and HOST2
- Make the content engaging and easy to understand
- Include natural transitions and enthusiasm
DIALOGUE FORMAT (strictly follow):
HOST1: [What they say]
HOST2: [What they say]
STRUCTURE:
1. Opening Hook (30 seconds): Grab attention
2. Introduction (1 minute): Set context
3. Main Discussion (70% of time): Deep dive into insights
4. Wrap-up (1 minute): Summarize key takeaways
Generate the complete podcast script now:""",
"educational": """Create an educational podcast discussing the provided documents.
DOCUMENT CONTENT:
{document_content}
KEY INSIGHTS:
{key_insights}
REQUIREMENTS:
- Duration: {duration_minutes} minutes (approximately {word_count} words)
- Style: Clear, methodical, educational
- HOST1 acts as teacher, HOST2 as curious learner
DIALOGUE FORMAT:
HOST1: [Expert explanation]
HOST2: [Clarifying question]
Generate the educational podcast script now:""",
"technical": """Create a technical podcast for an informed audience.
DOCUMENT CONTENT:
{document_content}
KEY INSIGHTS:
{key_insights}
REQUIREMENTS:
- Duration: {duration_minutes} minutes (approximately {word_count} words)
- Style: Professional, detailed, technically accurate
- HOST1 is expert, HOST2 is informed interviewer
DIALOGUE FORMAT:
HOST1: [Technical insight]
HOST2: [Probing question]
Generate the technical podcast script now:""",
"casual": """Create a fun, casual podcast discussing the documents.
DOCUMENT CONTENT:
{document_content}
KEY INSIGHTS:
{key_insights}
REQUIREMENTS:
- Duration: {duration_minutes} minutes (approximately {word_count} words)
- Style: Relaxed, humorous, energetic
- Make it entertaining while informative
DIALOGUE FORMAT:
HOST1: [Casual commentary]
HOST2: [Enthusiastic response]
Generate the casual podcast script now:"""
}
def __init__(
self,
llamaindex_service: LlamaIndexService,
llm_service: LLMService,
elevenlabs_api_key: Optional[str] = None
):
self.config = config.config
self.llamaindex_service = llamaindex_service
self.llm_service = llm_service
# Get document store from llamaindex service
self.document_store = llamaindex_service.document_store
# Initialize ElevenLabs client
self.elevenlabs_client = None
if ELEVENLABS_AVAILABLE:
api_key = elevenlabs_api_key or self.config.ELEVENLABS_API_KEY
if api_key:
try:
self.elevenlabs_client = ElevenLabs(api_key=api_key)
logger.info("ElevenLabs client initialized for podcast generation")
except Exception as e:
logger.error(f"Failed to initialize ElevenLabs client: {e}")
# Create podcast storage directory
self.podcast_dir = Path("./data/podcasts")
self.podcast_dir.mkdir(parents=True, exist_ok=True)
# Metadata database file
self.metadata_file = self.podcast_dir / "metadata_db.json"
self._ensure_metadata_db()
# Voice cache
self._voice_cache = {}
def _ensure_metadata_db(self):
"""Ensure metadata database exists"""
if not self.metadata_file.exists():
import json
self.metadata_file.write_text(json.dumps([], indent=2))
async def generate_podcast(
self,
document_ids: List[str],
style: str = "conversational",
duration_minutes: int = 10,
host1_voice: str = "Rachel",
host2_voice: str = "Adam"
) -> PodcastResult:
"""Generate a complete podcast from documents"""
start_time = datetime.now()
podcast_id = str(uuid.uuid4())
try:
logger.info(f"Starting podcast generation {podcast_id}")
logger.info(f"Documents: {document_ids}, Style: {style}, Duration: {duration_minutes}min")
# Step 1: Retrieve and analyze documents
logger.info("Step 1: Retrieving and analyzing documents...")
analysis = await self.analyze_documents(document_ids)
# Step 2: Generate script
logger.info("Step 2: Generating podcast script...")
script = await self.generate_script(analysis, style, duration_minutes)
# Step 3: Synthesize audio
logger.info("Step 3: Synthesizing audio with voices...")
audio_file_path = await self.synthesize_audio(
podcast_id,
script,
host1_voice,
host2_voice
)
# Calculate generation time
generation_time = (datetime.now() - start_time).total_seconds()
# Step 4: Create metadata
logger.info("Step 4: Creating metadata...")
metadata = self._create_metadata(
podcast_id,
analysis,
script,
audio_file_path,
{host1_voice, host2_voice},
document_ids,
style
)
# Save metadata
self._save_metadata(metadata)
# Save transcript
transcript_path = self.podcast_dir / f"{podcast_id}_transcript.txt"
transcript_path.write_text(script.to_text(), encoding="utf-8")
logger.info(f"Podcast generated successfully: {podcast_id}")
return PodcastResult(
podcast_id=podcast_id,
audio_file_path=str(audio_file_path),
transcript=script.to_text(),
metadata=metadata,
generation_time=generation_time,
success=True
)
except Exception as e:
logger.error(f"Podcast generation failed: {str(e)}", exc_info=True)
return PodcastResult(
podcast_id=podcast_id,
audio_file_path="",
transcript="",
metadata=None,
generation_time=(datetime.now() - start_time).total_seconds(),
success=False,
error=str(e)
)
async def analyze_documents(self, document_ids: List[str]) -> DocumentAnalysis:
"""
Retrieve documents and extract key insights for podcast
FIXED: Now actually retrieves document content from document store
"""
try:
# Step 1: Retrieve actual documents from document store
logger.info(f"Retrieving {len(document_ids)} documents from store...")
documents = []
document_contents = []
for doc_id in document_ids:
doc = await self.document_store.get_document(doc_id)
if doc:
documents.append(doc)
document_contents.append(doc.content)
logger.info(f"Retrieved document: {doc.filename} ({len(doc.content)} chars)")
else:
logger.warning(f"Document {doc_id} not found in store")
if not documents:
raise ValueError(f"No documents found for IDs: {document_ids}")
# Step 2: Combine document content
combined_content = "\n\n---DOCUMENT SEPARATOR---\n\n".join(document_contents)
# Truncate if too long (keep first portion for context)
max_content_length = 15000 # Adjust based on your LLM context window
if len(combined_content) > max_content_length:
logger.warning(f"Content too long ({len(combined_content)} chars), truncating to {max_content_length}")
combined_content = combined_content[:max_content_length] + "\n\n[Content truncated...]"
# Step 3: Use LLM to analyze the content
analysis_prompt = f"""Analyze the following document(s) and provide:
1. The 5-7 most important insights or key points (be specific and detailed)
2. Main themes and topics covered
3. The overall complexity level (beginner/intermediate/advanced)
4. A comprehensive summary suitable for podcast discussion
DOCUMENTS:
{combined_content}
Provide a structured analysis optimized for creating an engaging podcast discussion.
Format your response as:
KEY INSIGHTS:
1. [First key insight]
2. [Second key insight]
...
TOPICS:
- [Topic 1]
- [Topic 2]
...
COMPLEXITY: [beginner/intermediate/advanced]
SUMMARY:
[Your comprehensive summary here]
"""
logger.info("Analyzing content with LLM...")
result = await self.llm_service.generate_text(
analysis_prompt,
max_tokens=2000,
temperature=0.7
)
# Step 4: Parse the structured response
insights = self._extract_insights(result)
topics = self._extract_topics(result)
complexity = self._determine_complexity(result)
summary = self._extract_summary(result)
logger.info(f"Analysis complete: {len(insights)} insights, {len(topics)} topics")
return DocumentAnalysis(
key_insights=insights[:7],
topics=topics,
complexity_level=complexity,
estimated_words=len(combined_content.split()),
source_documents=[doc.filename for doc in documents],
summary=summary or result[:500]
)
except Exception as e:
logger.error(f"Document analysis failed: {str(e)}", exc_info=True)
raise RuntimeError(f"Failed to analyze documents: {str(e)}")
def _extract_summary(self, text: str) -> str:
"""Extract summary section from analysis"""
try:
if "SUMMARY:" in text:
parts = text.split("SUMMARY:")
if len(parts) > 1:
summary = parts[1].strip()
# Take first 500 chars if too long
return summary[:500] if len(summary) > 500 else summary
except:
pass
# Fallback: take first few sentences
sentences = text.split('.')
return '. '.join(sentences[:3]) + '.'
def _extract_insights(self, text: str) -> List[str]:
"""Extract key insights from analysis text"""
insights = []
lines = text.split('\n')
in_insights_section = False
for line in lines:
line = line.strip()
if "KEY INSIGHTS:" in line.upper():
in_insights_section = True
continue
elif line.upper().startswith(("TOPICS:", "COMPLEXITY:", "SUMMARY:")):
in_insights_section = False
if in_insights_section and line:
# Match patterns like "1.", "2.", "-", "*", "•"
insight = re.sub(r'^\d+\.|\-|\*|•', '', line).strip()
if len(insight) > 20:
insights.append(insight)
# Fallback if no insights found
if not insights:
sentences = text.split('.')
insights = [s.strip() + '.' for s in sentences[:7] if len(s.strip()) > 20]
return insights
def _extract_topics(self, text: str) -> List[str]:
"""Extract main topics from analysis"""
topics = []
lines = text.split('\n')
in_topics_section = False
for line in lines:
line = line.strip()
if "TOPICS:" in line.upper():
in_topics_section = True
continue
elif line.upper().startswith(("KEY INSIGHTS:", "COMPLEXITY:", "SUMMARY:")):
in_topics_section = False
if in_topics_section and line:
topic = re.sub(r'^\-|\*|•', '', line).strip()
if len(topic) > 2:
topics.append(topic)
# Fallback: simple keyword extraction
if not topics:
common_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
words = text.lower().split()
word_freq = {}
for word in words:
word = re.sub(r'[^\w\s]', '', word)
if len(word) > 4 and word not in common_words:
word_freq[word] = word_freq.get(word, 0) + 1
top_topics = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:5]
topics = [topic[0].title() for topic in top_topics]
return topics[:5]
def _determine_complexity(self, text: str) -> str:
"""Determine content complexity level"""
text_lower = text.lower()
if "complexity:" in text_lower:
for level in ["beginner", "intermediate", "advanced"]:
if level in text_lower.split("complexity:")[1][:100]:
return level
# Heuristic based on keywords
if any(word in text_lower for word in ['basic', 'introduction', 'beginner', 'simple']):
return "beginner"
elif any(word in text_lower for word in ['advanced', 'complex', 'sophisticated', 'expert']):
return "advanced"
else:
return "intermediate"
async def generate_script(
self,
analysis: DocumentAnalysis,
style: str,
duration_minutes: int
) -> PodcastScript:
"""Generate podcast script from analysis"""
target_words = duration_minutes * self.WORDS_PER_MINUTE
# Prepare context with insights
insights_text = "\n".join(f"{i+1}. {insight}" for i, insight in enumerate(analysis.key_insights))
# Get prompt template
prompt_template = self.SCRIPT_PROMPTS.get(style, self.SCRIPT_PROMPTS["conversational"])
# Fill template
prompt = prompt_template.format(
document_content=analysis.summary,
key_insights=insights_text,
duration_minutes=duration_minutes,
word_count=target_words
)
# Generate script
script_text = await self.llm_service.generate_text(
prompt,
max_tokens=target_words * 2,
temperature=0.8
)
# Parse into dialogue
dialogue = self._parse_script(script_text)
if not dialogue:
raise ValueError("Failed to parse script into dialogue lines")
word_count = sum(len(line.text.split()) for line in dialogue)
duration_estimate = word_count / self.WORDS_PER_MINUTE
return PodcastScript(
dialogue=dialogue,
total_duration_estimate=duration_estimate * 60,
word_count=word_count,
style=style
)
def _parse_script(self, script_text: str) -> List[DialogueLine]:
"""Parse generated script into dialogue lines"""
dialogue = []
lines = script_text.split('\n')
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith('HOST1:'):
text = line[6:].strip()
if text:
dialogue.append(DialogueLine(speaker="HOST1", text=text))
elif line.startswith('HOST2:'):
text = line[6:].strip()
if text:
dialogue.append(DialogueLine(speaker="HOST2", text=text))
return dialogue
def _get_voice_id(self, voice_name: str) -> str:
"""Get voice ID from voice name"""
try:
# Use cache if available
if not self._voice_cache:
voices = self.elevenlabs_client.voices.get_all()
if not voices or not voices.voices:
raise RuntimeError("No voices available")
for voice in voices.voices:
self._voice_cache[voice.name.lower()] = voice.voice_id
# Exact match
if voice_name.lower() in self._voice_cache:
return self._voice_cache[voice_name.lower()]
# Partial match
for name, voice_id in self._voice_cache.items():
if voice_name.lower() in name:
logger.info(f"Partial match for '{voice_name}': {name}")
return voice_id
# Fallback
first_voice_id = list(self._voice_cache.values())[0]
logger.warning(f"Voice '{voice_name}' not found, using default")
return first_voice_id
except Exception as e:
logger.error(f"Could not fetch voices: {e}")
raise RuntimeError(f"Failed to get voice ID: {str(e)}")
async def synthesize_audio(
self,
podcast_id: str,
script: PodcastScript,
host1_voice: str,
host2_voice: str
) -> Path:
"""Synthesize audio with alternating voices"""
if not self.elevenlabs_client:
raise RuntimeError("ElevenLabs client not initialized")
audio_file = self.podcast_dir / f"{podcast_id}.mp3"
try:
# Get voice IDs
host1_voice_id = self._get_voice_id(host1_voice)
host2_voice_id = self._get_voice_id(host2_voice)
logger.info(f"HOST1: {host1_voice}, HOST2: {host2_voice}")
voice_map = {
"HOST1": host1_voice_id,
"HOST2": host2_voice_id
}
audio_chunks = []
# Process each line with correct voice
for i, line in enumerate(script.dialogue):
logger.info(f"Line {i+1}/{len(script.dialogue)}: {line.speaker}")
voice_id = voice_map.get(line.speaker, host1_voice_id)
audio_generator = self.elevenlabs_client.text_to_speech.convert(
voice_id=voice_id,
text=line.text,
model_id="eleven_multilingual_v2"
)
line_chunks = []
for chunk in audio_generator:
if chunk:
line_chunks.append(chunk)
if line_chunks:
audio_chunks.append(b''.join(line_chunks))
if not audio_chunks:
raise RuntimeError("No audio chunks generated")
full_audio = b''.join(audio_chunks)
with open(audio_file, 'wb') as f:
f.write(full_audio)
if audio_file.exists() and audio_file.stat().st_size > 1000:
logger.info(f"Audio created: {audio_file} ({audio_file.stat().st_size} bytes)")
return audio_file
else:
raise RuntimeError("Audio file too small or empty")
except Exception as e:
logger.error(f"Audio synthesis failed: {e}", exc_info=True)
raise RuntimeError(f"Failed to generate audio: {str(e)}")
def _create_metadata(
self,
podcast_id: str,
analysis: DocumentAnalysis,
script: PodcastScript,
audio_path: Path,
voices: set,
document_ids: List[str],
style: str
) -> PodcastMetadata:
"""Create podcast metadata"""
title = f"Podcast: {analysis.topics[0] if analysis.topics else 'Document Discussion'}"
description = f"A {style} podcast discussing: {', '.join(analysis.source_documents)}"
file_size_mb = audio_path.stat().st_size / (1024 * 1024) if audio_path.exists() else 0
llm_cost = (script.word_count / 1000) * 0.01
tts_cost = (script.word_count * 5 / 1000) * 0.30
return PodcastMetadata(
podcast_id=podcast_id,
title=title,
description=description,
source_documents=analysis.source_documents,
style=style,
duration_seconds=script.total_duration_estimate,
file_size_mb=file_size_mb,
voices={"host1": list(voices)[0] if len(voices) > 0 else "Rachel",
"host2": list(voices)[1] if len(voices) > 1 else "Adam"},
generated_at=datetime.now().isoformat(),
generation_cost={"llm_cost": llm_cost, "tts_cost": tts_cost, "total": llm_cost + tts_cost},
key_topics=analysis.topics
)
def _save_metadata(self, metadata: PodcastMetadata):
"""Save metadata to database"""
try:
import json
existing = json.loads(self.metadata_file.read_text())
existing.append(asdict(metadata))
self.metadata_file.write_text(json.dumps(existing, indent=2))
logger.info(f"Metadata saved: {metadata.podcast_id}")
except Exception as e:
logger.error(f"Failed to save metadata: {e}")
def list_podcasts(self, limit: int = 10) -> List[PodcastMetadata]:
"""List generated podcasts"""
try:
import json
data = json.loads(self.metadata_file.read_text())
podcasts = [PodcastMetadata(**item) for item in data[-limit:]]
return list(reversed(podcasts))
except Exception as e:
logger.error(f"Failed to list podcasts: {e}")
return []
def get_podcast(self, podcast_id: str) -> Optional[PodcastMetadata]:
"""Get specific podcast metadata"""
try:
import json
data = json.loads(self.metadata_file.read_text())
for item in data:
if item.get('podcast_id') == podcast_id:
return PodcastMetadata(**item)
return None
except Exception as e:
logger.error(f"Failed to get podcast: {e}")
return None