Spaces:
Running
on
Zero
Running
on
Zero
""" | |
Main processing logic for FLUX Prompt Optimizer | |
Handles image analysis, prompt optimization, and scoring | |
""" | |
import logging | |
import time | |
from typing import Tuple, Dict, Any, Optional | |
from PIL import Image | |
from datetime import datetime | |
from config import APP_CONFIG, PROCESSING_CONFIG, get_device_config | |
from utils import ( | |
optimize_image, validate_image, apply_flux_rules, | |
calculate_prompt_score, get_score_grade, format_analysis_report, | |
clean_memory, safe_execute | |
) | |
from models import analyze_image | |
logger = logging.getLogger(__name__) | |
class FluxOptimizer: | |
"""Main optimizer class for FLUX prompt generation""" | |
def __init__(self, model_name: str = None): | |
self.model_name = model_name | |
self.device_config = get_device_config() | |
self.processing_stats = { | |
"total_processed": 0, | |
"successful_analyses": 0, | |
"failed_analyses": 0, | |
"average_processing_time": 0.0 | |
} | |
logger.info(f"FluxOptimizer initialized - Device: {self.device_config['device']}") | |
def process_image(self, image: Any) -> Tuple[str, str, str, Dict[str, Any]]: | |
""" | |
Complete image processing pipeline | |
Args: | |
image: Input image (PIL, numpy array, or file path) | |
Returns: | |
Tuple of (optimized_prompt, analysis_report, score_html, metadata) | |
""" | |
start_time = time.time() | |
metadata = { | |
"processing_time": 0.0, | |
"success": False, | |
"model_used": self.model_name or "auto", | |
"device": self.device_config["device"], | |
"error": None | |
} | |
try: | |
# Step 1: Validate and optimize input image | |
logger.info("Starting image processing pipeline...") | |
if not validate_image(image): | |
error_msg = "Invalid or unsupported image format" | |
logger.error(error_msg) | |
return self._create_error_response(error_msg, metadata) | |
optimized_image = optimize_image(image) | |
if optimized_image is None: | |
error_msg = "Image optimization failed" | |
logger.error(error_msg) | |
return self._create_error_response(error_msg, metadata) | |
logger.info(f"Image optimized to size: {optimized_image.size}") | |
# Step 2: Analyze image with selected model | |
logger.info("Running image analysis...") | |
analysis_success, analysis_result = safe_execute( | |
analyze_image, | |
optimized_image, | |
self.model_name | |
) | |
if not analysis_success: | |
error_msg = f"Image analysis failed: {analysis_result}" | |
logger.error(error_msg) | |
return self._create_error_response(error_msg, metadata) | |
description, analysis_metadata = analysis_result | |
logger.info(f"Analysis complete: {len(description)} characters") | |
# Step 3: Apply FLUX optimization rules | |
logger.info("Applying FLUX optimization rules...") | |
optimized_prompt = apply_flux_rules(description) | |
if not optimized_prompt: | |
optimized_prompt = "A professional photograph" | |
logger.warning("Empty prompt after optimization, using fallback") | |
# Step 4: Calculate quality score | |
logger.info("Calculating quality score...") | |
score, score_breakdown = calculate_prompt_score(optimized_prompt, analysis_metadata) | |
grade_info = get_score_grade(score) | |
# Step 5: Generate analysis report | |
processing_time = time.time() - start_time | |
metadata.update({ | |
"processing_time": processing_time, | |
"success": True, | |
"prompt_length": len(optimized_prompt), | |
"score": score, | |
"grade": grade_info["grade"], | |
"analysis_metadata": analysis_metadata | |
}) | |
analysis_report = self._generate_detailed_report( | |
optimized_prompt, analysis_metadata, score, | |
score_breakdown, processing_time | |
) | |
# Step 6: Create score HTML | |
score_html = self._generate_score_html(score, grade_info) | |
# Update statistics | |
self._update_stats(processing_time, True) | |
logger.info(f"Processing complete - Score: {score}, Time: {processing_time:.1f}s") | |
return optimized_prompt, analysis_report, score_html, metadata | |
except Exception as e: | |
processing_time = time.time() - start_time | |
error_msg = f"Unexpected error in processing pipeline: {str(e)}" | |
logger.error(error_msg, exc_info=True) | |
metadata.update({ | |
"processing_time": processing_time, | |
"error": error_msg | |
}) | |
self._update_stats(processing_time, False) | |
return self._create_error_response(error_msg, metadata) | |
finally: | |
# Always clean up memory | |
clean_memory() | |
def _create_error_response(self, error_msg: str, metadata: Dict[str, Any]) -> Tuple[str, str, str, Dict[str, Any]]: | |
"""Create standardized error response""" | |
error_prompt = "❌ Processing failed" | |
error_report = f"**Error:** {error_msg}\n\nPlease try with a different image or check the logs for more details." | |
error_html = self._generate_score_html(0, get_score_grade(0)) | |
metadata["success"] = False | |
metadata["error"] = error_msg | |
return error_prompt, error_report, error_html, metadata | |
def _generate_detailed_report(self, prompt: str, analysis_metadata: Dict[str, Any], | |
score: int, breakdown: Dict[str, int], | |
processing_time: float) -> str: | |
"""Generate comprehensive analysis report""" | |
model_used = analysis_metadata.get("model", "Unknown") | |
device_used = analysis_metadata.get("device", self.device_config["device"]) | |
confidence = analysis_metadata.get("confidence", 0.0) | |
# Device status emoji | |
device_emoji = "⚡" if device_used == "cuda" else "💻" | |
report = f"""**Analysis Complete** | |
**Processing:** {device_emoji} {device_used.upper()} • {processing_time:.1f}s • Model: {model_used} | |
**Score:** {score}/100 • Confidence: {confidence:.0%} | |
**Score Breakdown:** | |
• **Prompt Quality:** {breakdown.get('prompt_quality', 0)}/30 - Content detail and description | |
• **Technical Details:** {breakdown.get('technical_details', 0)}/25 - Camera and photography settings | |
• **Artistic Value:** {breakdown.get('artistic_value', 0)}/25 - Creative elements | |
• **FLUX Optimization:** {breakdown.get('flux_optimization', 0)}/20 - Platform optimizations | |
**Analysis Summary:** | |
**Description Length:** {len(prompt)} characters | |
**Model Used:** {analysis_metadata.get('model', 'N/A')} | |
**Applied Optimizations:** | |
✅ Camera settings added | |
✅ Lighting configuration applied | |
✅ Technical parameters optimized | |
✅ FLUX rules implemented | |
✅ Content cleaned and enhanced | |
**Performance:** | |
• **Processing Time:** {processing_time:.1f}s | |
• **Device:** {device_used.upper()} | |
• **Model Confidence:** {confidence:.0%} | |
**Frame 0 Laboratory for MIA**""" | |
return report | |
def _generate_score_html(self, score: int, grade_info: Dict[str, str]) -> str: | |
"""Generate HTML for score display""" | |
html = f''' | |
<div style="text-align: center; padding: 2rem; background: linear-gradient(135deg, #f0fdf4 0%, #dcfce7 100%); border: 3px solid {grade_info["color"]}; border-radius: 16px; margin: 1rem 0; box-shadow: 0 8px 25px -5px rgba(0, 0, 0, 0.1);"> | |
<div style="font-size: 3rem; font-weight: 800; color: {grade_info["color"]}; margin: 0; text-shadow: 0 2px 4px rgba(0,0,0,0.1);">{score}</div> | |
<div style="font-size: 1.25rem; color: #15803d; margin: 0.5rem 0; text-transform: uppercase; letter-spacing: 0.1em; font-weight: 700;">{grade_info["grade"]}</div> | |
<div style="font-size: 1rem; color: #15803d; margin: 0; text-transform: uppercase; letter-spacing: 0.05em; font-weight: 500;">FLUX Quality Score</div> | |
</div> | |
''' | |
return html | |
def _update_stats(self, processing_time: float, success: bool) -> None: | |
"""Update processing statistics""" | |
self.processing_stats["total_processed"] += 1 | |
if success: | |
self.processing_stats["successful_analyses"] += 1 | |
else: | |
self.processing_stats["failed_analyses"] += 1 | |
# Update rolling average of processing time | |
current_avg = self.processing_stats["average_processing_time"] | |
total_count = self.processing_stats["total_processed"] | |
self.processing_stats["average_processing_time"] = ( | |
(current_avg * (total_count - 1) + processing_time) / total_count | |
) | |
def get_stats(self) -> Dict[str, Any]: | |
"""Get current processing statistics""" | |
stats = self.processing_stats.copy() | |
if stats["total_processed"] > 0: | |
stats["success_rate"] = stats["successful_analyses"] / stats["total_processed"] | |
else: | |
stats["success_rate"] = 0.0 | |
stats["device_info"] = self.device_config | |
return stats | |
def reset_stats(self) -> None: | |
"""Reset processing statistics""" | |
self.processing_stats = { | |
"total_processed": 0, | |
"successful_analyses": 0, | |
"failed_analyses": 0, | |
"average_processing_time": 0.0 | |
} | |
logger.info("Processing statistics reset") | |
class BatchProcessor: | |
"""Handle batch processing of multiple images""" | |
def __init__(self, optimizer: FluxOptimizer): | |
self.optimizer = optimizer | |
self.batch_results = [] | |
def process_batch(self, images: list) -> list: | |
"""Process multiple images in batch""" | |
results = [] | |
for i, image in enumerate(images): | |
logger.info(f"Processing batch item {i+1}/{len(images)}") | |
try: | |
result = self.optimizer.process_image(image) | |
results.append({ | |
"index": i, | |
"success": result[3]["success"], | |
"result": result | |
}) | |
except Exception as e: | |
logger.error(f"Batch item {i} failed: {e}") | |
results.append({ | |
"index": i, | |
"success": False, | |
"error": str(e) | |
}) | |
self.batch_results = results | |
return results | |
def get_batch_summary(self) -> Dict[str, Any]: | |
"""Get summary of batch processing results""" | |
if not self.batch_results: | |
return {"total": 0, "successful": 0, "failed": 0} | |
successful = sum(1 for r in self.batch_results if r["success"]) | |
total = len(self.batch_results) | |
return { | |
"total": total, | |
"successful": successful, | |
"failed": total - successful, | |
"success_rate": successful / total if total > 0 else 0.0 | |
} | |
# Global optimizer instance | |
flux_optimizer = FluxOptimizer() | |
def process_image_simple(image: Any, model_name: str = None) -> Tuple[str, str, str]: | |
""" | |
Simple interface for image processing | |
Args: | |
image: Input image | |
model_name: Optional model name | |
Returns: | |
Tuple of (prompt, report, score_html) | |
""" | |
if model_name and model_name != flux_optimizer.model_name: | |
# Create temporary optimizer with specified model | |
temp_optimizer = FluxOptimizer(model_name) | |
prompt, report, score_html, _ = temp_optimizer.process_image(image) | |
else: | |
prompt, report, score_html, _ = flux_optimizer.process_image(image) | |
return prompt, report, score_html | |
# Export main components | |
__all__ = [ | |
"FluxOptimizer", | |
"BatchProcessor", | |
"flux_optimizer", | |
"process_image_simple" | |
] |