Malaji71's picture
Update utils.py
77acbe4 verified
"""
Utility functions for FLUX Prompt Optimizer
Clean, focused, and reusable utilities
"""
import re
import logging
import gc
from typing import Optional, Tuple, Dict, Any, List
from PIL import Image
import torch
import numpy as np
from config import PROCESSING_CONFIG, FLUX_RULES
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def setup_logging(level: str = "INFO") -> None:
"""Setup logging configuration"""
logging.basicConfig(
level=getattr(logging, level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def optimize_image(image: Any) -> Optional[Image.Image]:
"""
Optimize image for processing
Args:
image: Input image (PIL, numpy array, or file path)
Returns:
Optimized PIL Image or None if failed
"""
if image is None:
return None
try:
# Convert to PIL Image if necessary
if isinstance(image, np.ndarray):
image = Image.fromarray(image)
elif isinstance(image, str):
image = Image.open(image)
elif not isinstance(image, Image.Image):
logger.error(f"Unsupported image type: {type(image)}")
return None
# Convert to RGB if necessary
if image.mode != 'RGB':
image = image.convert('RGB')
# Resize if too large
max_size = PROCESSING_CONFIG["max_image_size"]
if image.size[0] > max_size or image.size[1] > max_size:
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
logger.info(f"Image resized to {image.size}")
return image
except Exception as e:
logger.error(f"Image optimization failed: {e}")
return None
def validate_image(image: Any) -> bool:
"""
Validate if image is processable
Args:
image: Input image to validate
Returns:
True if valid, False otherwise
"""
if image is None:
return False
try:
optimized = optimize_image(image)
return optimized is not None
except Exception:
return False
def clean_memory() -> None:
"""Clean up memory and GPU cache"""
try:
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
logger.debug("Memory cleaned")
except Exception as e:
logger.warning(f"Memory cleanup failed: {e}")
def apply_flux_rules(prompt: str, analysis_metadata: Optional[Dict[str, Any]] = None) -> str:
"""
Apply Flux optimization rules to a prompt
Args:
prompt: Raw prompt text
analysis_metadata: Optional metadata from image analysis including camera suggestions
Returns:
Optimized prompt following Flux rules
"""
if not prompt or not isinstance(prompt, str):
return ""
# Clean the prompt from unwanted elements
cleaned_prompt = prompt
for pattern in FLUX_RULES["remove_patterns"]:
cleaned_prompt = re.sub(pattern, '', cleaned_prompt, flags=re.IGNORECASE)
# Extract description part only (remove CAMERA_SETUP section if present)
description_part = _extract_description_only(cleaned_prompt)
# Check if BAGEL provided intelligent camera setup
camera_config = ""
if analysis_metadata and analysis_metadata.get("has_camera_suggestion") and analysis_metadata.get("camera_setup"):
# Use BAGEL's intelligent camera suggestion - clean and format it properly
bagel_camera = analysis_metadata["camera_setup"]
camera_config = _format_bagel_camera_suggestion(bagel_camera)
logger.info(f"Using BAGEL camera suggestion: {camera_config}")
else:
# Only use fallback if BAGEL didn't suggest anything
camera_config = _get_fallback_camera_config(description_part.lower())
logger.info("Using fallback camera configuration")
# Add lighting enhancements if not present and not already covered by BAGEL
lighting_enhancement = _get_lighting_enhancement(description_part.lower(), camera_config)
# Build final prompt: Description + Camera + Lighting
final_prompt = description_part + camera_config + lighting_enhancement
# Clean up formatting
final_prompt = _clean_prompt_formatting(final_prompt)
return final_prompt
def _extract_description_only(prompt: str) -> str:
"""Extract only the description part, removing camera setup sections"""
# Remove CAMERA_SETUP section if present
if "CAMERA_SETUP:" in prompt:
parts = prompt.split("CAMERA_SETUP:")
description = parts[0].strip()
elif "2. CAMERA_SETUP" in prompt:
parts = prompt.split("2. CAMERA_SETUP")
description = parts[0].strip()
else:
description = prompt
# Remove "DESCRIPTION:" label if present
if description.startswith("DESCRIPTION:"):
description = description.replace("DESCRIPTION:", "").strip()
elif description.startswith("1. DESCRIPTION:"):
description = description.replace("1. DESCRIPTION:", "").strip()
# Clean up any remaining camera recommendations from the description
description = re.sub(r'For this type of scene.*?shooting style would be.*?\.', '', description, flags=re.DOTALL)
description = re.sub(r'I would recommend.*?aperture.*?\.', '', description, flags=re.DOTALL)
# Remove numbered section residues (like "2.," at the end)
description = re.sub(r'\s*\d+\.\s*,?\s*$', '', description)
description = re.sub(r'\s*\d+\.\s*,?\s*', ' ', description)
return description.strip()
def _format_bagel_camera_suggestion(bagel_camera: str) -> str:
"""Format BAGEL's camera suggestion into clean FLUX format"""
try:
# Clean up the BAGEL suggestion
camera_text = bagel_camera.strip()
# Remove "CAMERA_SETUP:" if it's still there
camera_text = re.sub(r'^CAMERA_SETUP:\s*', '', camera_text)
# Extract key camera info using regex patterns
camera_patterns = {
'camera': r'(Canon EOS [^,]+|Sony A[^,]+|Leica [^,]+|Hasselblad [^,]+|Phase One [^,]+|Fujifilm [^,]+)',
'lens': r'(\d+mm[^,]*|[^,]*lens[^,]*)',
'aperture': r'(f/[\d.]+[^,]*)'
}
extracted_parts = []
for key, pattern in camera_patterns.items():
match = re.search(pattern, camera_text, re.IGNORECASE)
if match:
extracted_parts.append(match.group(1).strip())
if extracted_parts:
# Build clean camera config
camera_info = ', '.join(extracted_parts)
return f", Shot on {camera_info}, professional photography"
else:
# Fallback: use the first part of BAGEL's suggestion
first_sentence = camera_text.split('.')[0].strip()
if len(first_sentence) > 10:
return f", {first_sentence}, professional photography"
else:
return ", professional camera setup"
except Exception as e:
logger.warning(f"Failed to format BAGEL camera suggestion: {e}")
return ", professional camera setup"
def _get_fallback_camera_config(prompt_lower: str) -> str:
"""Get fallback camera configuration when BAGEL doesn't suggest one"""
# Improved detection logic
if any(word in prompt_lower for word in ['street', 'urban', 'city', 'documentary', 'crowd', 'protest']):
return FLUX_RULES["camera_configs"]["street"]
elif any(word in prompt_lower for word in ['portrait', 'person', 'man', 'woman', 'face']) and not any(word in prompt_lower for word in ['street', 'crowd', 'scene']):
return FLUX_RULES["camera_configs"]["portrait"]
elif any(word in prompt_lower for word in ['landscape', 'mountain', 'nature', 'outdoor']):
return FLUX_RULES["camera_configs"]["landscape"]
else:
return FLUX_RULES["camera_configs"]["default"]
def _get_lighting_enhancement(prompt_lower: str, camera_config: str) -> str:
"""Determine appropriate lighting enhancement"""
# Don't add lighting if already mentioned in prompt or camera config
if 'lighting' in prompt_lower or 'lighting' in camera_config.lower():
return ""
if 'dramatic' in prompt_lower or 'chaos' in prompt_lower or 'fire' in prompt_lower:
return FLUX_RULES["lighting_enhancements"]["dramatic"]
elif 'portrait' in camera_config.lower():
return FLUX_RULES["lighting_enhancements"]["portrait"]
else:
return FLUX_RULES["lighting_enhancements"]["default"]
def _clean_prompt_formatting(prompt: str) -> str:
"""Clean up prompt formatting"""
if not prompt:
return ""
# Ensure it starts with capital letter
prompt = prompt.strip()
if prompt:
prompt = prompt[0].upper() + prompt[1:] if len(prompt) > 1 else prompt.upper()
# Clean up spaces and commas
prompt = re.sub(r'\s+', ' ', prompt)
prompt = re.sub(r',\s*,+', ',', prompt)
prompt = re.sub(r'^\s*,\s*', '', prompt) # Remove leading commas
prompt = re.sub(r'\s*,\s*$', '', prompt) # Remove trailing commas
# Remove redundant periods
prompt = re.sub(r'\.+', '.', prompt)
return prompt.strip()
def calculate_prompt_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]:
"""
Calculate quality score for a prompt
Args:
prompt: The prompt to score
analysis_data: Optional analysis data to enhance scoring
Returns:
Tuple of (total_score, breakdown_dict)
"""
if not prompt:
return 0, {"prompt_quality": 0, "technical_details": 0, "artistic_value": 0, "flux_optimization": 0}
breakdown = {}
# Prompt quality score (0-30 points)
length_score = min(20, len(prompt) // 8) # Reward decent length
detail_score = min(10, len(prompt.split(',')) * 2) # Reward detail
breakdown["prompt_quality"] = length_score + detail_score
# Technical details score (0-25 points) - Enhanced for BAGEL camera suggestions
tech_score = 0
tech_keywords = ['shot on', 'lens', 'photography', 'lighting', 'camera']
for keyword in tech_keywords:
if keyword in prompt.lower():
tech_score += 5
# Bonus points for BAGEL camera suggestions
if analysis_data and analysis_data.get("has_camera_suggestion"):
tech_score += 10 # Higher bonus for intelligent camera selection
breakdown["technical_details"] = min(25, tech_score)
# Artistic value score (0-25 points)
art_keywords = ['masterful', 'professional', 'cinematic', 'dramatic', 'beautiful']
art_score = sum(5 for keyword in art_keywords if keyword in prompt.lower())
breakdown["artistic_value"] = min(25, art_score)
# Flux optimization score (0-20 points)
flux_score = 0
# Check for camera configuration (prefer BAGEL over fallback)
if analysis_data and analysis_data.get("has_camera_suggestion"):
flux_score += 15 # Higher score for BAGEL suggestions
elif any(camera in prompt for camera in FLUX_RULES["camera_configs"].values()):
flux_score += 10 # Lower score for fallback
# Check for lighting configuration
if any(lighting in prompt for lighting in FLUX_RULES["lighting_enhancements"].values()):
flux_score += 5
breakdown["flux_optimization"] = flux_score
# Calculate total
total_score = sum(breakdown.values())
return total_score, breakdown
def get_score_grade(score: int) -> Dict[str, str]:
"""
Get grade information for a score
Args:
score: Numeric score
Returns:
Dictionary with grade and color information
"""
from config import SCORING_CONFIG
for threshold, grade_info in sorted(SCORING_CONFIG["grade_thresholds"].items(), reverse=True):
if score >= threshold:
return grade_info
# Default to lowest grade
return SCORING_CONFIG["grade_thresholds"][0]
def format_analysis_report(analysis_data: Dict[str, Any], processing_time: float) -> str:
"""
Format analysis data into a readable report
Args:
analysis_data: Analysis results
processing_time: Time taken for processing
Returns:
Formatted markdown report
"""
model_used = analysis_data.get("model_used", "Unknown")
prompt_length = len(analysis_data.get("prompt", ""))
report = f"""**πŸš€ FLUX OPTIMIZATION COMPLETE**
**Model:** {model_used} β€’ **Time:** {processing_time:.1f}s β€’ **Length:** {prompt_length} chars
**πŸ“Š ANALYSIS SUMMARY:**
{analysis_data.get("summary", "Analysis completed successfully")}
**🎯 OPTIMIZATIONS APPLIED:**
βœ… Flux camera configuration
βœ… Professional lighting setup
βœ… Technical photography details
βœ… Artistic enhancement keywords
**⚑ Powered by Frame 0 Laboratory for MIA**"""
return report
def safe_execute(func, *args, **kwargs) -> Tuple[bool, Any]:
"""
Safely execute a function with error handling
Args:
func: Function to execute
*args: Function arguments
**kwargs: Function keyword arguments
Returns:
Tuple of (success: bool, result: Any)
"""
try:
result = func(*args, **kwargs)
return True, result
except Exception as e:
logger.error(f"Safe execution failed for {func.__name__}: {e}")
return False, str(e)
def truncate_text(text: str, max_length: int = 100) -> str:
"""
Truncate text to specified length with ellipsis
Args:
text: Text to truncate
max_length: Maximum length
Returns:
Truncated text
"""
if not text or len(text) <= max_length:
return text
return text[:max_length-3] + "..."
# Export main functions
__all__ = [
"setup_logging",
"optimize_image",
"validate_image",
"clean_memory",
"apply_flux_rules",
"calculate_prompt_score",
"get_score_grade",
"format_analysis_report",
"safe_execute",
"truncate_text"
]