|
|
import logging |
|
|
from typing import List, Dict, Any, Optional |
|
|
from dataclasses import dataclass, asdict |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
import re |
|
|
import uuid |
|
|
|
|
|
try: |
|
|
from elevenlabs import VoiceSettings |
|
|
from elevenlabs.client import ElevenLabs |
|
|
ELEVENLABS_AVAILABLE = True |
|
|
except ImportError: |
|
|
ELEVENLABS_AVAILABLE = False |
|
|
|
|
|
import config |
|
|
from services.llamaindex_service import LlamaIndexService |
|
|
from services.llm_service import LLMService |
|
|
from services.document_store_service import DocumentStoreService |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass |
|
|
class DocumentAnalysis: |
|
|
"""Analysis results from document(s)""" |
|
|
key_insights: List[str] |
|
|
topics: List[str] |
|
|
complexity_level: str |
|
|
estimated_words: int |
|
|
source_documents: List[str] |
|
|
summary: str |
|
|
|
|
|
@dataclass |
|
|
class DialogueLine: |
|
|
"""Single line of podcast dialogue""" |
|
|
speaker: str |
|
|
text: str |
|
|
pause_after: float = 0.5 |
|
|
|
|
|
@dataclass |
|
|
class PodcastScript: |
|
|
"""Complete podcast script""" |
|
|
dialogue: List[DialogueLine] |
|
|
total_duration_estimate: float |
|
|
word_count: int |
|
|
style: str |
|
|
|
|
|
def to_text(self) -> str: |
|
|
lines = [] |
|
|
for line in self.dialogue: |
|
|
lines.append(f"{line.speaker}: {line.text}") |
|
|
return "\n\n".join(lines) |
|
|
|
|
|
@dataclass |
|
|
class PodcastMetadata: |
|
|
"""Metadata for generated podcast""" |
|
|
podcast_id: str |
|
|
title: str |
|
|
description: str |
|
|
source_documents: List[str] |
|
|
style: str |
|
|
duration_seconds: float |
|
|
file_size_mb: float |
|
|
voices: Dict[str, str] |
|
|
generated_at: str |
|
|
generation_cost: Dict[str, float] |
|
|
key_topics: List[str] |
|
|
|
|
|
@dataclass |
|
|
class PodcastResult: |
|
|
"""Complete podcast generation result""" |
|
|
podcast_id: str |
|
|
audio_file_path: str |
|
|
transcript: str |
|
|
metadata: PodcastMetadata |
|
|
generation_time: float |
|
|
success: bool |
|
|
error: Optional[str] = None |
|
|
|
|
|
|
|
|
class PodcastGeneratorService: |
|
|
""" |
|
|
Service for generating conversational podcasts from documents. |
|
|
""" |
|
|
|
|
|
WORDS_PER_MINUTE = 150 |
|
|
|
|
|
SCRIPT_PROMPTS = { |
|
|
"conversational": """You are an expert podcast script writer. Create an engaging 2-host podcast discussing the provided documents. |
|
|
|
|
|
DOCUMENT CONTENT: |
|
|
{document_content} |
|
|
|
|
|
KEY INSIGHTS: |
|
|
{key_insights} |
|
|
|
|
|
REQUIREMENTS: |
|
|
- Duration: {duration_minutes} minutes (approximately {word_count} words) |
|
|
- Style: Conversational, friendly, and accessible |
|
|
- Format: Alternating dialogue between HOST1 and HOST2 |
|
|
- Make the content engaging and easy to understand |
|
|
- Include natural transitions and enthusiasm |
|
|
|
|
|
DIALOGUE FORMAT (strictly follow): |
|
|
HOST1: [What they say] |
|
|
HOST2: [What they say] |
|
|
|
|
|
STRUCTURE: |
|
|
1. Opening Hook (30 seconds): Grab attention |
|
|
2. Introduction (1 minute): Set context |
|
|
3. Main Discussion (70% of time): Deep dive into insights |
|
|
4. Wrap-up (1 minute): Summarize key takeaways |
|
|
|
|
|
Generate the complete podcast script now:""", |
|
|
|
|
|
"educational": """Create an educational podcast discussing the provided documents. |
|
|
|
|
|
DOCUMENT CONTENT: |
|
|
{document_content} |
|
|
|
|
|
KEY INSIGHTS: |
|
|
{key_insights} |
|
|
|
|
|
REQUIREMENTS: |
|
|
- Duration: {duration_minutes} minutes (approximately {word_count} words) |
|
|
- Style: Clear, methodical, educational |
|
|
- HOST1 acts as teacher, HOST2 as curious learner |
|
|
|
|
|
DIALOGUE FORMAT: |
|
|
HOST1: [Expert explanation] |
|
|
HOST2: [Clarifying question] |
|
|
|
|
|
Generate the educational podcast script now:""", |
|
|
|
|
|
"technical": """Create a technical podcast for an informed audience. |
|
|
|
|
|
DOCUMENT CONTENT: |
|
|
{document_content} |
|
|
|
|
|
KEY INSIGHTS: |
|
|
{key_insights} |
|
|
|
|
|
REQUIREMENTS: |
|
|
- Duration: {duration_minutes} minutes (approximately {word_count} words) |
|
|
- Style: Professional, detailed, technically accurate |
|
|
- HOST1 is expert, HOST2 is informed interviewer |
|
|
|
|
|
DIALOGUE FORMAT: |
|
|
HOST1: [Technical insight] |
|
|
HOST2: [Probing question] |
|
|
|
|
|
Generate the technical podcast script now:""", |
|
|
|
|
|
"casual": """Create a fun, casual podcast discussing the documents. |
|
|
|
|
|
DOCUMENT CONTENT: |
|
|
{document_content} |
|
|
|
|
|
KEY INSIGHTS: |
|
|
{key_insights} |
|
|
|
|
|
REQUIREMENTS: |
|
|
- Duration: {duration_minutes} minutes (approximately {word_count} words) |
|
|
- Style: Relaxed, humorous, energetic |
|
|
- Make it entertaining while informative |
|
|
|
|
|
DIALOGUE FORMAT: |
|
|
HOST1: [Casual commentary] |
|
|
HOST2: [Enthusiastic response] |
|
|
|
|
|
Generate the casual podcast script now:""" |
|
|
} |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
llamaindex_service: LlamaIndexService, |
|
|
llm_service: LLMService, |
|
|
elevenlabs_api_key: Optional[str] = None |
|
|
): |
|
|
self.config = config.config |
|
|
self.llamaindex_service = llamaindex_service |
|
|
self.llm_service = llm_service |
|
|
|
|
|
|
|
|
self.document_store = llamaindex_service.document_store |
|
|
|
|
|
|
|
|
self.elevenlabs_client = None |
|
|
if ELEVENLABS_AVAILABLE: |
|
|
api_key = elevenlabs_api_key or self.config.ELEVENLABS_API_KEY |
|
|
if api_key: |
|
|
try: |
|
|
self.elevenlabs_client = ElevenLabs(api_key=api_key) |
|
|
logger.info("ElevenLabs client initialized for podcast generation") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize ElevenLabs client: {e}") |
|
|
|
|
|
|
|
|
self.podcast_dir = Path("./data/podcasts") |
|
|
self.podcast_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
self.metadata_file = self.podcast_dir / "metadata_db.json" |
|
|
self._ensure_metadata_db() |
|
|
|
|
|
|
|
|
self._voice_cache = {} |
|
|
|
|
|
def _ensure_metadata_db(self): |
|
|
"""Ensure metadata database exists""" |
|
|
if not self.metadata_file.exists(): |
|
|
import json |
|
|
self.metadata_file.write_text(json.dumps([], indent=2)) |
|
|
|
|
|
async def generate_podcast( |
|
|
self, |
|
|
document_ids: List[str], |
|
|
style: str = "conversational", |
|
|
duration_minutes: int = 10, |
|
|
host1_voice: str = "Rachel", |
|
|
host2_voice: str = "Adam" |
|
|
) -> PodcastResult: |
|
|
"""Generate a complete podcast from documents""" |
|
|
start_time = datetime.now() |
|
|
podcast_id = str(uuid.uuid4()) |
|
|
|
|
|
try: |
|
|
logger.info(f"Starting podcast generation {podcast_id}") |
|
|
logger.info(f"Documents: {document_ids}, Style: {style}, Duration: {duration_minutes}min") |
|
|
|
|
|
|
|
|
logger.info("Step 1: Retrieving and analyzing documents...") |
|
|
analysis = await self.analyze_documents(document_ids) |
|
|
|
|
|
|
|
|
logger.info("Step 2: Generating podcast script...") |
|
|
script = await self.generate_script(analysis, style, duration_minutes) |
|
|
|
|
|
|
|
|
logger.info("Step 3: Synthesizing audio with voices...") |
|
|
audio_file_path = await self.synthesize_audio( |
|
|
podcast_id, |
|
|
script, |
|
|
host1_voice, |
|
|
host2_voice |
|
|
) |
|
|
|
|
|
|
|
|
generation_time = (datetime.now() - start_time).total_seconds() |
|
|
|
|
|
|
|
|
logger.info("Step 4: Creating metadata...") |
|
|
metadata = self._create_metadata( |
|
|
podcast_id, |
|
|
analysis, |
|
|
script, |
|
|
audio_file_path, |
|
|
{host1_voice, host2_voice}, |
|
|
document_ids, |
|
|
style |
|
|
) |
|
|
|
|
|
|
|
|
self._save_metadata(metadata) |
|
|
|
|
|
|
|
|
transcript_path = self.podcast_dir / f"{podcast_id}_transcript.txt" |
|
|
transcript_path.write_text(script.to_text(), encoding="utf-8") |
|
|
|
|
|
logger.info(f"Podcast generated successfully: {podcast_id}") |
|
|
|
|
|
return PodcastResult( |
|
|
podcast_id=podcast_id, |
|
|
audio_file_path=str(audio_file_path), |
|
|
transcript=script.to_text(), |
|
|
metadata=metadata, |
|
|
generation_time=generation_time, |
|
|
success=True |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Podcast generation failed: {str(e)}", exc_info=True) |
|
|
return PodcastResult( |
|
|
podcast_id=podcast_id, |
|
|
audio_file_path="", |
|
|
transcript="", |
|
|
metadata=None, |
|
|
generation_time=(datetime.now() - start_time).total_seconds(), |
|
|
success=False, |
|
|
error=str(e) |
|
|
) |
|
|
|
|
|
async def analyze_documents(self, document_ids: List[str]) -> DocumentAnalysis: |
|
|
""" |
|
|
Retrieve documents and extract key insights for podcast |
|
|
|
|
|
FIXED: Now actually retrieves document content from document store |
|
|
""" |
|
|
try: |
|
|
|
|
|
logger.info(f"Retrieving {len(document_ids)} documents from store...") |
|
|
documents = [] |
|
|
document_contents = [] |
|
|
|
|
|
for doc_id in document_ids: |
|
|
doc = await self.document_store.get_document(doc_id) |
|
|
if doc: |
|
|
documents.append(doc) |
|
|
document_contents.append(doc.content) |
|
|
logger.info(f"Retrieved document: {doc.filename} ({len(doc.content)} chars)") |
|
|
else: |
|
|
logger.warning(f"Document {doc_id} not found in store") |
|
|
|
|
|
if not documents: |
|
|
raise ValueError(f"No documents found for IDs: {document_ids}") |
|
|
|
|
|
|
|
|
combined_content = "\n\n---DOCUMENT SEPARATOR---\n\n".join(document_contents) |
|
|
|
|
|
|
|
|
max_content_length = 15000 |
|
|
if len(combined_content) > max_content_length: |
|
|
logger.warning(f"Content too long ({len(combined_content)} chars), truncating to {max_content_length}") |
|
|
combined_content = combined_content[:max_content_length] + "\n\n[Content truncated...]" |
|
|
|
|
|
|
|
|
analysis_prompt = f"""Analyze the following document(s) and provide: |
|
|
|
|
|
1. The 5-7 most important insights or key points (be specific and detailed) |
|
|
2. Main themes and topics covered |
|
|
3. The overall complexity level (beginner/intermediate/advanced) |
|
|
4. A comprehensive summary suitable for podcast discussion |
|
|
|
|
|
DOCUMENTS: |
|
|
{combined_content} |
|
|
|
|
|
Provide a structured analysis optimized for creating an engaging podcast discussion. |
|
|
Format your response as: |
|
|
|
|
|
KEY INSIGHTS: |
|
|
1. [First key insight] |
|
|
2. [Second key insight] |
|
|
... |
|
|
|
|
|
TOPICS: |
|
|
- [Topic 1] |
|
|
- [Topic 2] |
|
|
... |
|
|
|
|
|
COMPLEXITY: [beginner/intermediate/advanced] |
|
|
|
|
|
SUMMARY: |
|
|
[Your comprehensive summary here] |
|
|
""" |
|
|
|
|
|
logger.info("Analyzing content with LLM...") |
|
|
result = await self.llm_service.generate_text( |
|
|
analysis_prompt, |
|
|
max_tokens=2000, |
|
|
temperature=0.7 |
|
|
) |
|
|
|
|
|
|
|
|
insights = self._extract_insights(result) |
|
|
topics = self._extract_topics(result) |
|
|
complexity = self._determine_complexity(result) |
|
|
summary = self._extract_summary(result) |
|
|
|
|
|
logger.info(f"Analysis complete: {len(insights)} insights, {len(topics)} topics") |
|
|
|
|
|
return DocumentAnalysis( |
|
|
key_insights=insights[:7], |
|
|
topics=topics, |
|
|
complexity_level=complexity, |
|
|
estimated_words=len(combined_content.split()), |
|
|
source_documents=[doc.filename for doc in documents], |
|
|
summary=summary or result[:500] |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Document analysis failed: {str(e)}", exc_info=True) |
|
|
raise RuntimeError(f"Failed to analyze documents: {str(e)}") |
|
|
|
|
|
def _extract_summary(self, text: str) -> str: |
|
|
"""Extract summary section from analysis""" |
|
|
try: |
|
|
if "SUMMARY:" in text: |
|
|
parts = text.split("SUMMARY:") |
|
|
if len(parts) > 1: |
|
|
summary = parts[1].strip() |
|
|
|
|
|
return summary[:500] if len(summary) > 500 else summary |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
sentences = text.split('.') |
|
|
return '. '.join(sentences[:3]) + '.' |
|
|
|
|
|
def _extract_insights(self, text: str) -> List[str]: |
|
|
"""Extract key insights from analysis text""" |
|
|
insights = [] |
|
|
lines = text.split('\n') |
|
|
|
|
|
in_insights_section = False |
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
|
|
|
if "KEY INSIGHTS:" in line.upper(): |
|
|
in_insights_section = True |
|
|
continue |
|
|
elif line.upper().startswith(("TOPICS:", "COMPLEXITY:", "SUMMARY:")): |
|
|
in_insights_section = False |
|
|
|
|
|
if in_insights_section and line: |
|
|
|
|
|
insight = re.sub(r'^\d+\.|\-|\*|•', '', line).strip() |
|
|
if len(insight) > 20: |
|
|
insights.append(insight) |
|
|
|
|
|
|
|
|
if not insights: |
|
|
sentences = text.split('.') |
|
|
insights = [s.strip() + '.' for s in sentences[:7] if len(s.strip()) > 20] |
|
|
|
|
|
return insights |
|
|
|
|
|
def _extract_topics(self, text: str) -> List[str]: |
|
|
"""Extract main topics from analysis""" |
|
|
topics = [] |
|
|
lines = text.split('\n') |
|
|
|
|
|
in_topics_section = False |
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
|
|
|
if "TOPICS:" in line.upper(): |
|
|
in_topics_section = True |
|
|
continue |
|
|
elif line.upper().startswith(("KEY INSIGHTS:", "COMPLEXITY:", "SUMMARY:")): |
|
|
in_topics_section = False |
|
|
|
|
|
if in_topics_section and line: |
|
|
topic = re.sub(r'^\-|\*|•', '', line).strip() |
|
|
if len(topic) > 2: |
|
|
topics.append(topic) |
|
|
|
|
|
|
|
|
if not topics: |
|
|
common_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} |
|
|
words = text.lower().split() |
|
|
word_freq = {} |
|
|
|
|
|
for word in words: |
|
|
word = re.sub(r'[^\w\s]', '', word) |
|
|
if len(word) > 4 and word not in common_words: |
|
|
word_freq[word] = word_freq.get(word, 0) + 1 |
|
|
|
|
|
top_topics = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:5] |
|
|
topics = [topic[0].title() for topic in top_topics] |
|
|
|
|
|
return topics[:5] |
|
|
|
|
|
def _determine_complexity(self, text: str) -> str: |
|
|
"""Determine content complexity level""" |
|
|
text_lower = text.lower() |
|
|
|
|
|
if "complexity:" in text_lower: |
|
|
for level in ["beginner", "intermediate", "advanced"]: |
|
|
if level in text_lower.split("complexity:")[1][:100]: |
|
|
return level |
|
|
|
|
|
|
|
|
if any(word in text_lower for word in ['basic', 'introduction', 'beginner', 'simple']): |
|
|
return "beginner" |
|
|
elif any(word in text_lower for word in ['advanced', 'complex', 'sophisticated', 'expert']): |
|
|
return "advanced" |
|
|
else: |
|
|
return "intermediate" |
|
|
|
|
|
async def generate_script( |
|
|
self, |
|
|
analysis: DocumentAnalysis, |
|
|
style: str, |
|
|
duration_minutes: int |
|
|
) -> PodcastScript: |
|
|
"""Generate podcast script from analysis""" |
|
|
target_words = duration_minutes * self.WORDS_PER_MINUTE |
|
|
|
|
|
|
|
|
insights_text = "\n".join(f"{i+1}. {insight}" for i, insight in enumerate(analysis.key_insights)) |
|
|
|
|
|
|
|
|
prompt_template = self.SCRIPT_PROMPTS.get(style, self.SCRIPT_PROMPTS["conversational"]) |
|
|
|
|
|
|
|
|
prompt = prompt_template.format( |
|
|
document_content=analysis.summary, |
|
|
key_insights=insights_text, |
|
|
duration_minutes=duration_minutes, |
|
|
word_count=target_words |
|
|
) |
|
|
|
|
|
|
|
|
script_text = await self.llm_service.generate_text( |
|
|
prompt, |
|
|
max_tokens=target_words * 2, |
|
|
temperature=0.8 |
|
|
) |
|
|
|
|
|
|
|
|
dialogue = self._parse_script(script_text) |
|
|
|
|
|
if not dialogue: |
|
|
raise ValueError("Failed to parse script into dialogue lines") |
|
|
|
|
|
word_count = sum(len(line.text.split()) for line in dialogue) |
|
|
duration_estimate = word_count / self.WORDS_PER_MINUTE |
|
|
|
|
|
return PodcastScript( |
|
|
dialogue=dialogue, |
|
|
total_duration_estimate=duration_estimate * 60, |
|
|
word_count=word_count, |
|
|
style=style |
|
|
) |
|
|
|
|
|
def _parse_script(self, script_text: str) -> List[DialogueLine]: |
|
|
"""Parse generated script into dialogue lines""" |
|
|
dialogue = [] |
|
|
lines = script_text.split('\n') |
|
|
|
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
|
|
|
if line.startswith('HOST1:'): |
|
|
text = line[6:].strip() |
|
|
if text: |
|
|
dialogue.append(DialogueLine(speaker="HOST1", text=text)) |
|
|
elif line.startswith('HOST2:'): |
|
|
text = line[6:].strip() |
|
|
if text: |
|
|
dialogue.append(DialogueLine(speaker="HOST2", text=text)) |
|
|
|
|
|
return dialogue |
|
|
|
|
|
def _get_voice_id(self, voice_name: str) -> str: |
|
|
"""Get voice ID from voice name""" |
|
|
try: |
|
|
|
|
|
if not self._voice_cache: |
|
|
voices = self.elevenlabs_client.voices.get_all() |
|
|
if not voices or not voices.voices: |
|
|
raise RuntimeError("No voices available") |
|
|
|
|
|
for voice in voices.voices: |
|
|
self._voice_cache[voice.name.lower()] = voice.voice_id |
|
|
|
|
|
|
|
|
if voice_name.lower() in self._voice_cache: |
|
|
return self._voice_cache[voice_name.lower()] |
|
|
|
|
|
|
|
|
for name, voice_id in self._voice_cache.items(): |
|
|
if voice_name.lower() in name: |
|
|
logger.info(f"Partial match for '{voice_name}': {name}") |
|
|
return voice_id |
|
|
|
|
|
|
|
|
first_voice_id = list(self._voice_cache.values())[0] |
|
|
logger.warning(f"Voice '{voice_name}' not found, using default") |
|
|
return first_voice_id |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Could not fetch voices: {e}") |
|
|
raise RuntimeError(f"Failed to get voice ID: {str(e)}") |
|
|
|
|
|
async def synthesize_audio( |
|
|
self, |
|
|
podcast_id: str, |
|
|
script: PodcastScript, |
|
|
host1_voice: str, |
|
|
host2_voice: str |
|
|
) -> Path: |
|
|
"""Synthesize audio with alternating voices""" |
|
|
if not self.elevenlabs_client: |
|
|
raise RuntimeError("ElevenLabs client not initialized") |
|
|
|
|
|
audio_file = self.podcast_dir / f"{podcast_id}.mp3" |
|
|
|
|
|
try: |
|
|
|
|
|
host1_voice_id = self._get_voice_id(host1_voice) |
|
|
host2_voice_id = self._get_voice_id(host2_voice) |
|
|
|
|
|
logger.info(f"HOST1: {host1_voice}, HOST2: {host2_voice}") |
|
|
|
|
|
voice_map = { |
|
|
"HOST1": host1_voice_id, |
|
|
"HOST2": host2_voice_id |
|
|
} |
|
|
|
|
|
audio_chunks = [] |
|
|
|
|
|
|
|
|
for i, line in enumerate(script.dialogue): |
|
|
logger.info(f"Line {i+1}/{len(script.dialogue)}: {line.speaker}") |
|
|
|
|
|
voice_id = voice_map.get(line.speaker, host1_voice_id) |
|
|
|
|
|
audio_generator = self.elevenlabs_client.text_to_speech.convert( |
|
|
voice_id=voice_id, |
|
|
text=line.text, |
|
|
model_id="eleven_multilingual_v2" |
|
|
) |
|
|
|
|
|
line_chunks = [] |
|
|
for chunk in audio_generator: |
|
|
if chunk: |
|
|
line_chunks.append(chunk) |
|
|
|
|
|
if line_chunks: |
|
|
audio_chunks.append(b''.join(line_chunks)) |
|
|
|
|
|
if not audio_chunks: |
|
|
raise RuntimeError("No audio chunks generated") |
|
|
|
|
|
full_audio = b''.join(audio_chunks) |
|
|
|
|
|
with open(audio_file, 'wb') as f: |
|
|
f.write(full_audio) |
|
|
|
|
|
if audio_file.exists() and audio_file.stat().st_size > 1000: |
|
|
logger.info(f"Audio created: {audio_file} ({audio_file.stat().st_size} bytes)") |
|
|
return audio_file |
|
|
else: |
|
|
raise RuntimeError("Audio file too small or empty") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Audio synthesis failed: {e}", exc_info=True) |
|
|
raise RuntimeError(f"Failed to generate audio: {str(e)}") |
|
|
|
|
|
def _create_metadata( |
|
|
self, |
|
|
podcast_id: str, |
|
|
analysis: DocumentAnalysis, |
|
|
script: PodcastScript, |
|
|
audio_path: Path, |
|
|
voices: set, |
|
|
document_ids: List[str], |
|
|
style: str |
|
|
) -> PodcastMetadata: |
|
|
"""Create podcast metadata""" |
|
|
title = f"Podcast: {analysis.topics[0] if analysis.topics else 'Document Discussion'}" |
|
|
description = f"A {style} podcast discussing: {', '.join(analysis.source_documents)}" |
|
|
file_size_mb = audio_path.stat().st_size / (1024 * 1024) if audio_path.exists() else 0 |
|
|
|
|
|
llm_cost = (script.word_count / 1000) * 0.01 |
|
|
tts_cost = (script.word_count * 5 / 1000) * 0.30 |
|
|
|
|
|
return PodcastMetadata( |
|
|
podcast_id=podcast_id, |
|
|
title=title, |
|
|
description=description, |
|
|
source_documents=analysis.source_documents, |
|
|
style=style, |
|
|
duration_seconds=script.total_duration_estimate, |
|
|
file_size_mb=file_size_mb, |
|
|
voices={"host1": list(voices)[0] if len(voices) > 0 else "Rachel", |
|
|
"host2": list(voices)[1] if len(voices) > 1 else "Adam"}, |
|
|
generated_at=datetime.now().isoformat(), |
|
|
generation_cost={"llm_cost": llm_cost, "tts_cost": tts_cost, "total": llm_cost + tts_cost}, |
|
|
key_topics=analysis.topics |
|
|
) |
|
|
|
|
|
def _save_metadata(self, metadata: PodcastMetadata): |
|
|
"""Save metadata to database""" |
|
|
try: |
|
|
import json |
|
|
existing = json.loads(self.metadata_file.read_text()) |
|
|
existing.append(asdict(metadata)) |
|
|
self.metadata_file.write_text(json.dumps(existing, indent=2)) |
|
|
logger.info(f"Metadata saved: {metadata.podcast_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save metadata: {e}") |
|
|
|
|
|
def list_podcasts(self, limit: int = 10) -> List[PodcastMetadata]: |
|
|
"""List generated podcasts""" |
|
|
try: |
|
|
import json |
|
|
data = json.loads(self.metadata_file.read_text()) |
|
|
podcasts = [PodcastMetadata(**item) for item in data[-limit:]] |
|
|
return list(reversed(podcasts)) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to list podcasts: {e}") |
|
|
return [] |
|
|
|
|
|
def get_podcast(self, podcast_id: str) -> Optional[PodcastMetadata]: |
|
|
"""Get specific podcast metadata""" |
|
|
try: |
|
|
import json |
|
|
data = json.loads(self.metadata_file.read_text()) |
|
|
for item in data: |
|
|
if item.get('podcast_id') == podcast_id: |
|
|
return PodcastMetadata(**item) |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get podcast: {e}") |
|
|
return None |