| """ |
| Core Utilities Module for BackgroundFX Pro |
| Contains FileManager, VideoUtils, ImageUtils, and ValidationUtils |
| """ |
|
|
| |
| import os |
| if 'OMP_NUM_THREADS' not in os.environ: |
| os.environ['OMP_NUM_THREADS'] = '4' |
| os.environ['MKL_NUM_THREADS'] = '4' |
|
|
| import shutil |
| import tempfile |
| import logging |
| from pathlib import Path |
| from typing import Optional, List, Union, Tuple, Dict, Any |
| from datetime import datetime |
| import subprocess |
| import re |
|
|
| import cv2 |
| import numpy as np |
| import torch |
| from PIL import Image, ImageEnhance, ImageFilter, ImageDraw |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
|
|
| class ValidationUtils: |
| """Validation utilities for BackgroundFX Pro application.""" |
| |
| |
| SUPPORTED_VIDEO_FORMATS = {'.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv', '.m4v'} |
| SUPPORTED_IMAGE_FORMATS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'} |
| |
| |
| MAX_VIDEO_SIZE = 500 * 1024 * 1024 |
| MAX_IMAGE_SIZE = 50 * 1024 * 1024 |
| MIN_VIDEO_SIZE = 1024 |
| |
| |
| MAX_VIDEO_DURATION = 300 |
| MIN_VIDEO_DURATION = 1 |
| MAX_RESOLUTION = (3840, 2160) |
| MIN_RESOLUTION = (320, 240) |
| MAX_FPS = 120 |
| MIN_FPS = 10 |
| |
| @staticmethod |
| def validate_video_file(file_path, check_content=False): |
| """ |
| Validate video file for processing. |
| |
| Args: |
| file_path: Path to the video file |
| check_content: Whether to perform deep content validation |
| |
| Returns: |
| tuple: (is_valid, error_message) |
| """ |
| from pathlib import Path |
| |
| if not file_path: |
| return False, "No file path provided" |
| |
| path = Path(file_path) |
| |
| |
| if not path.exists(): |
| return False, f"File not found: {file_path}" |
| |
| |
| if path.suffix.lower() not in ValidationUtils.SUPPORTED_VIDEO_FORMATS: |
| return False, f"Unsupported video format: {path.suffix}. Supported formats: {', '.join(ValidationUtils.SUPPORTED_VIDEO_FORMATS)}" |
| |
| |
| file_size = path.stat().st_size |
| if file_size > ValidationUtils.MAX_VIDEO_SIZE: |
| size_mb = file_size / (1024 * 1024) |
| return False, f"Video file too large: {size_mb:.1f}MB (max: {ValidationUtils.MAX_VIDEO_SIZE / (1024 * 1024):.0f}MB)" |
| |
| if file_size < ValidationUtils.MIN_VIDEO_SIZE: |
| return False, "Video file appears to be empty or corrupted" |
| |
| |
| if check_content: |
| try: |
| cap = cv2.VideoCapture(str(file_path)) |
| |
| if not cap.isOpened(): |
| return False, "Unable to open video file - may be corrupted" |
| |
| |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| |
| |
| duration = frame_count / fps if fps > 0 else 0 |
| |
| cap.release() |
| |
| |
| if duration > ValidationUtils.MAX_VIDEO_DURATION: |
| return False, f"Video too long: {duration:.1f}s (max: {ValidationUtils.MAX_VIDEO_DURATION}s)" |
| |
| if duration < ValidationUtils.MIN_VIDEO_DURATION: |
| return False, f"Video too short: {duration:.1f}s (min: {ValidationUtils.MIN_VIDEO_DURATION}s)" |
| |
| if width > ValidationUtils.MAX_RESOLUTION[0] or height > ValidationUtils.MAX_RESOLUTION[1]: |
| return False, f"Video resolution too high: {width}x{height} (max: {ValidationUtils.MAX_RESOLUTION[0]}x{ValidationUtils.MAX_RESOLUTION[1]})" |
| |
| if width < ValidationUtils.MIN_RESOLUTION[0] or height < ValidationUtils.MIN_RESOLUTION[1]: |
| return False, f"Video resolution too low: {width}x{height} (min: {ValidationUtils.MIN_RESOLUTION[0]}x{ValidationUtils.MIN_RESOLUTION[1]})" |
| |
| if fps > ValidationUtils.MAX_FPS: |
| return False, f"Frame rate too high: {fps:.1f} fps (max: {ValidationUtils.MAX_FPS} fps)" |
| |
| if fps < ValidationUtils.MIN_FPS: |
| return False, f"Frame rate too low: {fps:.1f} fps (min: {ValidationUtils.MIN_FPS} fps)" |
| |
| except Exception as e: |
| return False, f"Error validating video content: {str(e)}" |
| |
| return True, "Video file is valid" |
| |
| @staticmethod |
| def validate_image_file(file_path, check_content=False): |
| """ |
| Validate image file for background replacement. |
| |
| Args: |
| file_path: Path to the image file |
| check_content: Whether to perform deep content validation |
| |
| Returns: |
| tuple: (is_valid, error_message) |
| """ |
| from pathlib import Path |
| |
| if not file_path: |
| return False, "No file path provided" |
| |
| path = Path(file_path) |
| |
| |
| if not path.exists(): |
| return False, f"File not found: {file_path}" |
| |
| |
| if path.suffix.lower() not in ValidationUtils.SUPPORTED_IMAGE_FORMATS: |
| return False, f"Unsupported image format: {path.suffix}. Supported formats: {', '.join(ValidationUtils.SUPPORTED_IMAGE_FORMATS)}" |
| |
| |
| file_size = path.stat().st_size |
| if file_size > ValidationUtils.MAX_IMAGE_SIZE: |
| size_mb = file_size / (1024 * 1024) |
| return False, f"Image file too large: {size_mb:.1f}MB (max: {ValidationUtils.MAX_IMAGE_SIZE / (1024 * 1024):.0f}MB)" |
| |
| |
| if check_content: |
| try: |
| img = cv2.imread(str(file_path)) |
| if img is None: |
| return False, "Unable to read image file - may be corrupted" |
| |
| height, width = img.shape[:2] |
| |
| |
| if width > ValidationUtils.MAX_RESOLUTION[0] or height > ValidationUtils.MAX_RESOLUTION[1]: |
| return False, f"Image resolution too high: {width}x{height} (max: {ValidationUtils.MAX_RESOLUTION[0]}x{ValidationUtils.MAX_RESOLUTION[1]})" |
| |
| if width < ValidationUtils.MIN_RESOLUTION[0] or height < ValidationUtils.MIN_RESOLUTION[1]: |
| return False, f"Image resolution too low: {width}x{height} (min: {ValidationUtils.MIN_RESOLUTION[0]}x{ValidationUtils.MIN_RESOLUTION[1]})" |
| |
| except Exception as e: |
| return False, f"Error validating image content: {str(e)}" |
| |
| return True, "Image file is valid" |
| |
| @staticmethod |
| def validate_processing_params(params): |
| """ |
| Validate processing parameters. |
| |
| Args: |
| params: Dictionary of processing parameters |
| |
| Returns: |
| tuple: (is_valid, error_message) |
| """ |
| if not params: |
| return False, "No parameters provided" |
| |
| |
| if 'confidence_threshold' in params: |
| conf = params['confidence_threshold'] |
| if not isinstance(conf, (int, float)): |
| return False, "Confidence threshold must be a number" |
| if conf < 0 or conf > 1: |
| return False, "Confidence threshold must be between 0 and 1" |
| |
| |
| if 'mask_dilation' in params: |
| dilation = params['mask_dilation'] |
| if not isinstance(dilation, int): |
| return False, "Mask dilation must be an integer" |
| if dilation < 0 or dilation > 50: |
| return False, "Mask dilation must be between 0 and 50" |
| |
| |
| if 'edge_smoothing' in params: |
| smooth = params['edge_smoothing'] |
| if not isinstance(smooth, int): |
| return False, "Edge smoothing must be an integer" |
| if smooth < 0 or smooth > 100: |
| return False, "Edge smoothing must be between 0 and 100" |
| |
| |
| if 'color_adjustment' in params: |
| color_adj = params['color_adjustment'] |
| if not isinstance(color_adj, bool): |
| return False, "Color adjustment must be a boolean" |
| |
| |
| if 'output_quality' in params: |
| quality = params['output_quality'] |
| if not isinstance(quality, int): |
| return False, "Output quality must be an integer" |
| if quality < 1 or quality > 100: |
| return False, "Output quality must be between 1 and 100" |
| |
| |
| if 'processing_method' in params: |
| method = params['processing_method'] |
| valid_methods = {'sam2', 'matanyone', 'cv_fallback', 'auto'} |
| if method not in valid_methods: |
| return False, f"Invalid processing method. Must be one of: {', '.join(valid_methods)}" |
| |
| return True, "Parameters are valid" |
| |
| @staticmethod |
| def validate_output_path(output_path, create_dirs=False): |
| """ |
| Validate output path for saving results. |
| |
| Args: |
| output_path: Path where output will be saved |
| create_dirs: Whether to create directories if they don't exist |
| |
| Returns: |
| tuple: (is_valid, error_message) |
| """ |
| from pathlib import Path |
| |
| if not output_path: |
| return False, "No output path provided" |
| |
| path = Path(output_path) |
| parent_dir = path.parent |
| |
| |
| if not parent_dir.exists(): |
| if create_dirs: |
| try: |
| parent_dir.mkdir(parents=True, exist_ok=True) |
| except Exception as e: |
| return False, f"Failed to create output directory: {str(e)}" |
| else: |
| return False, f"Output directory does not exist: {parent_dir}" |
| |
| |
| if not os.access(parent_dir, os.W_OK): |
| return False, f"No write permission for directory: {parent_dir}" |
| |
| |
| if path.exists(): |
| if not os.access(path, os.W_OK): |
| return False, f"Cannot overwrite existing file: {output_path}" |
| |
| return True, "Output path is valid" |
| |
| @staticmethod |
| def sanitize_filename(filename): |
| """ |
| Sanitize filename to be safe for filesystem. |
| |
| Args: |
| filename: Original filename |
| |
| Returns: |
| str: Sanitized filename |
| """ |
| from pathlib import Path |
| |
| |
| path = Path(filename) |
| stem = path.stem |
| suffix = path.suffix |
| |
| |
| |
| stem = re.sub(r'[^\w\-_.]', '_', stem) |
| |
| |
| stem = re.sub(r'_+', '_', stem) |
| |
| |
| stem = stem.strip('_') |
| |
| |
| if not stem: |
| stem = 'output' |
| |
| |
| max_length = 200 |
| if len(stem) > max_length: |
| stem = stem[:max_length] |
| |
| return f"{stem}{suffix}" |
| |
| @staticmethod |
| def validate_memory_available(required_mb=1000): |
| """ |
| Check if sufficient memory is available. |
| |
| Args: |
| required_mb: Required memory in megabytes |
| |
| Returns: |
| tuple: (is_sufficient, available_mb, error_message) |
| """ |
| try: |
| import psutil |
| |
| mem = psutil.virtual_memory() |
| available_mb = mem.available / (1024 * 1024) |
| |
| if available_mb < required_mb: |
| return False, available_mb, f"Insufficient memory: {available_mb:.0f}MB available, {required_mb:.0f}MB required" |
| |
| return True, available_mb, f"Sufficient memory available: {available_mb:.0f}MB" |
| |
| except ImportError: |
| |
| return True, -1, "Memory check skipped (psutil not available)" |
| except Exception as e: |
| return True, -1, f"Memory check failed: {str(e)}" |
| |
| @staticmethod |
| def validate_gpu_available(): |
| """ |
| Check if GPU is available for processing. |
| |
| Returns: |
| tuple: (is_available, device_info) |
| """ |
| try: |
| if torch.cuda.is_available(): |
| device_name = torch.cuda.get_device_name(0) |
| memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3) |
| return True, f"GPU available: {device_name} ({memory_gb:.1f}GB)" |
| else: |
| return False, "No GPU available - will use CPU" |
| |
| except ImportError: |
| return False, "PyTorch not available for GPU check" |
| except Exception as e: |
| return False, f"GPU check failed: {str(e)}" |
| |
| @staticmethod |
| def validate_url(url): |
| """ |
| Validate URL format. |
| |
| Args: |
| url: URL string to validate |
| |
| Returns: |
| tuple: (is_valid, error_message) |
| """ |
| if not url: |
| return False, "No URL provided" |
| |
| |
| url_pattern = re.compile( |
| r'^https?://' |
| r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' |
| r'localhost|' |
| r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
| r'(?::\d+)?' |
| r'(?:/?|[/?]\S+)$', re.IGNORECASE) |
| |
| if url_pattern.match(url): |
| return True, "Valid URL" |
| else: |
| return False, "Invalid URL format" |
|
|
| |
| |
| |
|
|
| class FileManager: |
| """Manages file operations for BackgroundFX Pro""" |
| |
| def __init__(self, base_dir: Optional[str] = None): |
| """Initialize FileManager""" |
| if base_dir: |
| self.base_dir = Path(base_dir) |
| else: |
| self.base_dir = Path(tempfile.gettempdir()) / "backgroundfx_pro" |
| |
| self.base_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| self.uploads_dir = self.base_dir / "uploads" |
| self.outputs_dir = self.base_dir / "outputs" |
| self.temp_dir = self.base_dir / "temp" |
| self.cache_dir = self.base_dir / "cache" |
| |
| for dir_path in [self.uploads_dir, self.outputs_dir, self.temp_dir, self.cache_dir]: |
| dir_path.mkdir(parents=True, exist_ok=True) |
| |
| logger.info(f"FileManager initialized with base directory: {self.base_dir}") |
| |
| def save_upload(self, file_path: Union[str, Path], filename: Optional[str] = None) -> Path: |
| """Save an uploaded file to the uploads directory""" |
| file_path = Path(file_path) |
| |
| if filename: |
| dest_path = self.uploads_dir / filename |
| else: |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| dest_path = self.uploads_dir / f"{timestamp}_{file_path.name}" |
| |
| shutil.copy2(file_path, dest_path) |
| logger.info(f"Saved upload: {dest_path}") |
| return dest_path |
| |
| def create_output_path(self, filename: str, subfolder: Optional[str] = None) -> Path: |
| """Create a path for an output file""" |
| if subfolder: |
| output_dir = self.outputs_dir / subfolder |
| output_dir.mkdir(parents=True, exist_ok=True) |
| else: |
| output_dir = self.outputs_dir |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| name_parts = filename.rsplit('.', 1) |
| if len(name_parts) == 2: |
| output_path = output_dir / f"{name_parts[0]}_{timestamp}.{name_parts[1]}" |
| else: |
| output_path = output_dir / f"{filename}_{timestamp}" |
| |
| return output_path |
| |
| def get_temp_path(self, filename: Optional[str] = None, extension: str = ".tmp") -> Path: |
| """Get a temporary file path""" |
| if filename: |
| temp_path = self.temp_dir / filename |
| else: |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") |
| temp_path = self.temp_dir / f"temp_{timestamp}{extension}" |
| |
| return temp_path |
| |
| def cleanup_temp(self, max_age_hours: int = 24): |
| """Clean up old temporary files""" |
| try: |
| current_time = datetime.now().timestamp() |
| max_age_seconds = max_age_hours * 3600 |
| |
| for temp_file in self.temp_dir.iterdir(): |
| if temp_file.is_file(): |
| file_age = current_time - temp_file.stat().st_mtime |
| if file_age > max_age_seconds: |
| temp_file.unlink() |
| logger.debug(f"Deleted old temp file: {temp_file}") |
| |
| logger.info("Temp directory cleanup completed") |
| except Exception as e: |
| logger.warning(f"Error during temp cleanup: {e}") |
| |
| def get_cache_path(self, key: str, extension: str = ".cache") -> Path: |
| """Get a cache file path based on a key""" |
| safe_key = "".join(c if c.isalnum() or c in '-_' else '_' for c in key) |
| return self.cache_dir / f"{safe_key}{extension}" |
| |
| def list_outputs(self, subfolder: Optional[str] = None, extension: Optional[str] = None) -> List[Path]: |
| """List output files""" |
| if subfolder: |
| search_dir = self.outputs_dir / subfolder |
| else: |
| search_dir = self.outputs_dir |
| |
| if not search_dir.exists(): |
| return [] |
| |
| if extension: |
| pattern = f"*{extension}" |
| else: |
| pattern = "*" |
| |
| return sorted(search_dir.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True) |
| |
| def delete_file(self, file_path: Union[str, Path]) -> bool: |
| """Safely delete a file""" |
| try: |
| file_path = Path(file_path) |
| if file_path.exists() and file_path.is_file(): |
| file_path.unlink() |
| logger.info(f"Deleted file: {file_path}") |
| return True |
| return False |
| except Exception as e: |
| logger.error(f"Error deleting file {file_path}: {e}") |
| return False |
| |
| def get_file_info(self, file_path: Union[str, Path]) -> dict: |
| """Get information about a file""" |
| file_path = Path(file_path) |
| |
| if not file_path.exists(): |
| return {"exists": False} |
| |
| stat = file_path.stat() |
| return { |
| "exists": True, |
| "name": file_path.name, |
| "size": stat.st_size, |
| "size_mb": stat.st_size / (1024 * 1024), |
| "created": datetime.fromtimestamp(stat.st_ctime), |
| "modified": datetime.fromtimestamp(stat.st_mtime), |
| "extension": file_path.suffix, |
| "path": str(file_path.absolute()) |
| } |
|
|
| |
| |
| |
|
|
| class VideoUtils: |
| """Utilities for video processing""" |
| |
| @staticmethod |
| def get_video_info(video_path: Union[str, Path]) -> Dict[str, Any]: |
| """Get detailed video information""" |
| video_path = str(video_path) |
| cap = cv2.VideoCapture(video_path) |
| |
| if not cap.isOpened(): |
| logger.error(f"Failed to open video: {video_path}") |
| return {"error": "Failed to open video"} |
| |
| try: |
| info = { |
| "fps": cap.get(cv2.CAP_PROP_FPS), |
| "frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), |
| "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), |
| "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), |
| "codec": VideoUtils._fourcc_to_string(int(cap.get(cv2.CAP_PROP_FOURCC))), |
| "duration": cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 0 |
| } |
| |
| path = Path(video_path) |
| if path.exists(): |
| info["file_size_mb"] = path.stat().st_size / (1024 * 1024) |
| |
| return info |
| |
| finally: |
| cap.release() |
| |
| @staticmethod |
| def _fourcc_to_string(fourcc: int) -> str: |
| """Convert fourcc code to string""" |
| return "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)]) |
| |
| @staticmethod |
| def extract_frames(video_path: Union[str, Path], |
| output_dir: Union[str, Path], |
| frame_interval: int = 1, |
| max_frames: Optional[int] = None) -> List[Path]: |
| """Extract frames from video""" |
| video_path = str(video_path) |
| output_dir = Path(output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
| |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| logger.error(f"Failed to open video: {video_path}") |
| return [] |
| |
| frame_paths = [] |
| frame_count = 0 |
| extracted_count = 0 |
| |
| try: |
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| |
| if frame_count % frame_interval == 0: |
| frame_path = output_dir / f"frame_{frame_count:06d}.png" |
| cv2.imwrite(str(frame_path), frame) |
| frame_paths.append(frame_path) |
| extracted_count += 1 |
| |
| if max_frames and extracted_count >= max_frames: |
| break |
| |
| frame_count += 1 |
| |
| logger.info(f"Extracted {len(frame_paths)} frames from video") |
| return frame_paths |
| |
| finally: |
| cap.release() |
| |
| @staticmethod |
| def create_video_from_frames(frame_paths: List[Union[str, Path]], |
| output_path: Union[str, Path], |
| fps: float = 30.0, |
| codec: str = 'mp4v') -> bool: |
| """Create video from frame images""" |
| if not frame_paths: |
| logger.error("No frames provided") |
| return False |
| |
| first_frame = cv2.imread(str(frame_paths[0])) |
| if first_frame is None: |
| logger.error(f"Failed to read first frame: {frame_paths[0]}") |
| return False |
| |
| height, width, layers = first_frame.shape |
| |
| fourcc = cv2.VideoWriter_fourcc(*codec) |
| out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) |
| |
| try: |
| for frame_path in frame_paths: |
| frame = cv2.imread(str(frame_path)) |
| if frame is not None: |
| out.write(frame) |
| else: |
| logger.warning(f"Failed to read frame: {frame_path}") |
| |
| logger.info(f"Created video: {output_path}") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Error creating video: {e}") |
| return False |
| |
| finally: |
| out.release() |
| |
| @staticmethod |
| def resize_video(input_path: Union[str, Path], |
| output_path: Union[str, Path], |
| target_width: Optional[int] = None, |
| target_height: Optional[int] = None, |
| maintain_aspect: bool = True) -> bool: |
| """Resize video to target dimensions""" |
| cap = cv2.VideoCapture(str(input_path)) |
| if not cap.isOpened(): |
| logger.error(f"Failed to open video: {input_path}") |
| return False |
| |
| orig_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| orig_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| fourcc = int(cap.get(cv2.CAP_PROP_FOURCC)) |
| |
| if maintain_aspect: |
| if target_width and not target_height: |
| aspect = orig_width / orig_height |
| target_height = int(target_width / aspect) |
| elif target_height and not target_width: |
| aspect = orig_width / orig_height |
| target_width = int(target_height * aspect) |
| |
| if not target_width: |
| target_width = orig_width |
| if not target_height: |
| target_height = orig_height |
| |
| out = cv2.VideoWriter(str(output_path), fourcc, fps, (target_width, target_height)) |
| |
| try: |
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| |
| resized = cv2.resize(frame, (target_width, target_height)) |
| out.write(resized) |
| |
| logger.info(f"Resized video saved to: {output_path}") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Error resizing video: {e}") |
| return False |
| |
| finally: |
| cap.release() |
| out.release() |
| |
| @staticmethod |
| def extract_audio(video_path: Union[str, Path], |
| audio_path: Union[str, Path]) -> bool: |
| """Extract audio from video using ffmpeg""" |
| try: |
| cmd = [ |
| 'ffmpeg', '-i', str(video_path), |
| '-vn', '-acodec', 'copy', |
| str(audio_path), '-y' |
| ] |
| |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| |
| if result.returncode == 0: |
| logger.info(f"Audio extracted to: {audio_path}") |
| return True |
| else: |
| logger.error(f"Failed to extract audio: {result.stderr}") |
| return False |
| |
| except FileNotFoundError: |
| logger.error("ffmpeg not found. Please install ffmpeg.") |
| return False |
| except Exception as e: |
| logger.error(f"Error extracting audio: {e}") |
| return False |
| |
| @staticmethod |
| def add_audio_to_video(video_path: Union[str, Path], |
| audio_path: Union[str, Path], |
| output_path: Union[str, Path]) -> bool: |
| """Add audio track to video using ffmpeg""" |
| try: |
| cmd = [ |
| 'ffmpeg', '-i', str(video_path), |
| '-i', str(audio_path), |
| '-c:v', 'copy', '-c:a', 'aac', |
| '-map', '0:v:0', '-map', '1:a:0', |
| str(output_path), '-y' |
| ] |
| |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| |
| if result.returncode == 0: |
| logger.info(f"Video with audio saved to: {output_path}") |
| return True |
| else: |
| logger.error(f"Failed to add audio: {result.stderr}") |
| return False |
| |
| except FileNotFoundError: |
| logger.error("ffmpeg not found. Please install ffmpeg.") |
| return False |
| except Exception as e: |
| logger.error(f"Error adding audio: {e}") |
| return False |
|
|
| |
| |
| |
|
|
| class ImageUtils: |
| """Utilities for image processing and manipulation""" |
| |
| @staticmethod |
| def load_image(image_path: Union[str, Path]) -> Optional[Image.Image]: |
| """Load an image using PIL""" |
| try: |
| return Image.open(str(image_path)) |
| except Exception as e: |
| logger.error(f"Failed to load image {image_path}: {e}") |
| return None |
| |
| @staticmethod |
| def resize_image(image: Image.Image, |
| max_width: Optional[int] = None, |
| max_height: Optional[int] = None, |
| maintain_aspect: bool = True) -> Image.Image: |
| """Resize image to fit within max dimensions""" |
| if not max_width and not max_height: |
| return image |
| |
| width, height = image.size |
| |
| if maintain_aspect: |
| scale = 1.0 |
| if max_width: |
| scale = min(scale, max_width / width) |
| if max_height: |
| scale = min(scale, max_height / height) |
| |
| new_width = int(width * scale) |
| new_height = int(height * scale) |
| else: |
| new_width = max_width or width |
| new_height = max_height or height |
| |
| return image.resize((new_width, new_height), Image.Resampling.LANCZOS) |
| |
| @staticmethod |
| def convert_to_cv2(pil_image: Image.Image) -> np.ndarray: |
| """Convert PIL Image to OpenCV format""" |
| if pil_image.mode != 'RGB': |
| pil_image = pil_image.convert('RGB') |
| |
| np_image = np.array(pil_image) |
| return cv2.cvtColor(np_image, cv2.COLOR_RGB2BGR) |
| |
| @staticmethod |
| def convert_from_cv2(cv2_image: np.ndarray) -> Image.Image: |
| """Convert OpenCV image to PIL format""" |
| rgb_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2RGB) |
| return Image.fromarray(rgb_image) |
| |
| @staticmethod |
| def apply_blur(image: Image.Image, radius: float = 5.0) -> Image.Image: |
| """Apply Gaussian blur to image""" |
| return image.filter(ImageFilter.GaussianBlur(radius=radius)) |
| |
| @staticmethod |
| def adjust_brightness(image: Image.Image, factor: float = 1.0) -> Image.Image: |
| """Adjust image brightness""" |
| enhancer = ImageEnhance.Brightness(image) |
| return enhancer.enhance(factor) |
| |
| @staticmethod |
| def adjust_contrast(image: Image.Image, factor: float = 1.0) -> Image.Image: |
| """Adjust image contrast""" |
| enhancer = ImageEnhance.Contrast(image) |
| return enhancer.enhance(factor) |
| |
| @staticmethod |
| def adjust_saturation(image: Image.Image, factor: float = 1.0) -> Image.Image: |
| """Adjust image saturation""" |
| enhancer = ImageEnhance.Color(image) |
| return enhancer.enhance(factor) |
| |
| @staticmethod |
| def crop_center(image: Image.Image, crop_width: int, crop_height: int) -> Image.Image: |
| """Crop image from center""" |
| width, height = image.size |
| |
| left = (width - crop_width) // 2 |
| top = (height - crop_height) // 2 |
| right = left + crop_width |
| bottom = top + crop_height |
| |
| return image.crop((left, top, right, bottom)) |
| |
| @staticmethod |
| def create_thumbnail(image: Image.Image, size: Tuple[int, int] = (128, 128)) -> Image.Image: |
| """Create thumbnail preserving aspect ratio""" |
| img_copy = image.copy() |
| img_copy.thumbnail(size, Image.Resampling.LANCZOS) |
| return img_copy |
| |
| @staticmethod |
| def apply_mask(image: Image.Image, mask: Image.Image, alpha: float = 1.0) -> Image.Image: |
| """Apply mask to image""" |
| if image.mode != 'RGBA': |
| image = image.convert('RGBA') |
| |
| if mask.mode != 'L': |
| mask = mask.convert('L') |
| |
| if mask.size != image.size: |
| mask = mask.resize(image.size, Image.Resampling.LANCZOS) |
| |
| if alpha < 1.0: |
| mask = ImageEnhance.Brightness(mask).enhance(alpha) |
| |
| image.putalpha(mask) |
| return image |
| |
| @staticmethod |
| def composite_images(foreground: Image.Image, |
| background: Image.Image, |
| position: Tuple[int, int] = (0, 0), |
| alpha: float = 1.0) -> Image.Image: |
| """Composite foreground image over background""" |
| if foreground.mode != 'RGBA': |
| foreground = foreground.convert('RGBA') |
| if background.mode != 'RGBA': |
| background = background.convert('RGBA') |
| |
| if alpha < 1.0: |
| foreground = foreground.copy() |
| foreground.putalpha( |
| ImageEnhance.Brightness(foreground.split()[3]).enhance(alpha) |
| ) |
| |
| output = background.copy() |
| output.paste(foreground, position, foreground) |
| |
| return output |
| |
| @staticmethod |
| def get_image_info(image_path: Union[str, Path]) -> Dict[str, Any]: |
| """Get image file information""" |
| try: |
| image_path = Path(image_path) |
| |
| if not image_path.exists(): |
| return {"exists": False} |
| |
| with Image.open(str(image_path)) as img: |
| info = { |
| "exists": True, |
| "filename": image_path.name, |
| "format": img.format, |
| "mode": img.mode, |
| "size": img.size, |
| "width": img.width, |
| "height": img.height, |
| "file_size_mb": image_path.stat().st_size / (1024 * 1024) |
| } |
| |
| if hasattr(img, '_getexif') and img._getexif(): |
| info["has_exif"] = True |
| else: |
| info["has_exif"] = False |
| |
| return info |
| |
| except Exception as e: |
| logger.error(f"Error getting image info for {image_path}: {e}") |
| return {"exists": False, "error": str(e)} |
| |
| @staticmethod |
| def save_image(image: Image.Image, |
| output_path: Union[str, Path], |
| quality: int = 95, |
| optimize: bool = True) -> bool: |
| """Save image with specified quality""" |
| try: |
| output_path = Path(output_path) |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| |
| save_kwargs = {} |
| ext = output_path.suffix.lower() |
| |
| if ext in ['.jpg', '.jpeg']: |
| save_kwargs['quality'] = quality |
| save_kwargs['optimize'] = optimize |
| elif ext == '.png': |
| save_kwargs['optimize'] = optimize |
| |
| image.save(str(output_path), **save_kwargs) |
| logger.info(f"Saved image to: {output_path}") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Failed to save image to {output_path}: {e}") |
| return False |
|
|
| |
| |
| |
|
|
| def validate_video_file(file_path: str) -> tuple: |
| """Validate if file is a valid video file.""" |
| import os |
| import cv2 |
| |
| if not os.path.exists(file_path): |
| return False, f"File not found: {file_path}" |
| |
| try: |
| cap = cv2.VideoCapture(file_path) |
| ret = cap.isOpened() |
| cap.release() |
| if ret: |
| return True, "Video file is valid" |
| else: |
| return False, "Unable to open video file - may be corrupted" |
| except Exception as e: |
| return False, f"Error validating video: {str(e)}" |
| |