""" Batch processing module for BackgroundFX Pro. Handles efficient processing of multiple files with optimized resource management. """ import os import cv2 import numpy as np from pathlib import Path from typing import Dict, List, Optional, Tuple, Union, Callable, Any, Generator from dataclasses import dataclass, field from enum import Enum import time import threading from queue import Queue, PriorityQueue, Empty from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed import multiprocessing as mp import json import hashlib import pickle import shutil import tempfile from datetime import datetime import psutil import mimetypes from ..utils.logger import setup_logger from ..utils.device import DeviceManager from ..utils import TimeEstimator, MemoryMonitor from .pipeline import ProcessingPipeline, PipelineConfig, PipelineResult, ProcessingMode from .video_processor import VideoProcessorAPI, VideoStats logger = setup_logger(__name__) class BatchPriority(Enum): """Batch processing priority levels.""" LOW = 3 NORMAL = 2 HIGH = 1 URGENT = 0 class FileType(Enum): """Supported file types.""" IMAGE = "image" VIDEO = "video" UNKNOWN = "unknown" @dataclass class BatchItem: """Individual item in batch processing.""" id: str input_path: str output_path: str file_type: FileType priority: BatchPriority = BatchPriority.NORMAL background: Optional[Union[str, np.ndarray]] = None config_overrides: Dict[str, Any] = field(default_factory=dict) metadata: Dict[str, Any] = field(default_factory=dict) retry_count: int = 0 max_retries: int = 3 status: str = "pending" error: Optional[str] = None result: Optional[Any] = None processing_time: float = 0.0 def __lt__(self, other): """Compare items by priority for PriorityQueue.""" return self.priority.value < other.priority.value @dataclass class BatchConfig: """Configuration for batch processing.""" # Processing settings max_workers: int = mp.cpu_count() use_multiprocessing: bool = False chunk_size: int = 10 # Resource limits max_memory_gb: float = 8.0 max_gpu_memory_gb: float = 4.0 cpu_limit_percent: float = 80.0 # File handling input_dir: Optional[str] = None output_dir: Optional[str] = None recursive: bool = True file_patterns: List[str] = field(default_factory=lambda: ["*.jpg", "*.png", "*.mp4", "*.avi"]) preserve_structure: bool = True # Background settings default_background: Optional[Union[str, np.ndarray]] = None background_per_file: Dict[str, Union[str, np.ndarray]] = field(default_factory=dict) # Quality settings image_quality: int = 95 video_quality: str = "high" maintain_resolution: bool = True # Optimization enable_caching: bool = True cache_dir: Optional[str] = None deduplicate: bool = True # Progress and logging progress_callback: Optional[Callable[[float, Dict], None]] = None save_report: bool = True report_path: Optional[str] = None # Error handling stop_on_error: bool = False skip_existing: bool = True # Pipeline config pipeline_config: Optional[PipelineConfig] = None @dataclass class BatchReport: """Batch processing report.""" start_time: datetime end_time: Optional[datetime] = None total_items: int = 0 processed_items: int = 0 successful_items: int = 0 failed_items: int = 0 skipped_items: int = 0 total_processing_time: float = 0.0 avg_processing_time: float = 0.0 total_input_size_mb: float = 0.0 total_output_size_mb: float = 0.0 compression_ratio: float = 1.0 errors: List[Dict[str, Any]] = field(default_factory=list) warnings: List[str] = field(default_factory=list) resource_usage: Dict[str, Any] = field(default_factory=dict) quality_metrics: Dict[str, float] = field(default_factory=dict) class BatchProcessor: """High-performance batch processing engine.""" def __init__(self, config: Optional[BatchConfig] = None): """ Initialize batch processor. Args: config: Batch processing configuration """ self.config = config or BatchConfig() self.logger = setup_logger(f"{__name__}.BatchProcessor") # Initialize components self.device_manager = DeviceManager() self.memory_monitor = MemoryMonitor() self.time_estimator = TimeEstimator() # Processing engines self.pipeline = ProcessingPipeline(self.config.pipeline_config) self.video_processor = VideoProcessorAPI() # State management self.is_processing = False self.should_stop = False self.current_item = None # Queues self.pending_queue = PriorityQueue() self.processing_queue = Queue() self.completed_queue = Queue() # Worker pool if self.config.use_multiprocessing: self.executor = ProcessPoolExecutor(max_workers=self.config.max_workers) else: self.executor = ThreadPoolExecutor(max_workers=self.config.max_workers) # Cache self.cache_dir = Path(self.config.cache_dir or tempfile.mkdtemp(prefix="bgfx_cache_")) self.cache_index = {} # Statistics self.report = BatchReport(start_time=datetime.now()) self.logger.info(f"BatchProcessor initialized with {self.config.max_workers} workers") def process_directory(self, input_dir: str, output_dir: str, background: Optional[Union[str, np.ndarray]] = None) -> BatchReport: """ Process all supported files in a directory. Args: input_dir: Input directory path output_dir: Output directory path background: Default background for all files Returns: Batch processing report """ input_path = Path(input_dir) output_path = Path(output_dir) if not input_path.exists(): raise ValueError(f"Input directory does not exist: {input_dir}") output_path.mkdir(parents=True, exist_ok=True) # Collect files items = self._collect_files(input_path, output_path, background) if not items: self.logger.warning("No files found to process") return self.report self.logger.info(f"Found {len(items)} files to process") # Process batch return self.process_batch(items) def _collect_files(self, input_path: Path, output_path: Path, background: Optional[Union[str, np.ndarray]]) -> List[BatchItem]: """Collect all files to process from directory.""" items = [] # Determine search method if self.config.recursive: file_iterator = input_path.rglob else: file_iterator = input_path.glob # Collect files matching patterns for pattern in self.config.file_patterns: for file_path in file_iterator(pattern): if file_path.is_file(): # Determine output path if self.config.preserve_structure: relative_path = file_path.relative_to(input_path) output_file = output_path / relative_path.parent / f"{file_path.stem}_processed{file_path.suffix}" else: output_file = output_path / f"{file_path.stem}_processed{file_path.suffix}" # Skip if exists and configured to skip if self.config.skip_existing and output_file.exists(): self.report.skipped_items += 1 continue # Determine file type file_type = self._detect_file_type(str(file_path)) # Create batch item item = BatchItem( id=self._generate_item_id(file_path), input_path=str(file_path), output_path=str(output_file), file_type=file_type, background=self.config.background_per_file.get( str(file_path), background or self.config.default_background ) ) items.append(item) return items def process_batch(self, items: List[BatchItem]) -> BatchReport: """ Process a batch of items. Args: items: List of batch items to process Returns: Batch processing report """ self.is_processing = True self.report = BatchReport(start_time=datetime.now()) self.report.total_items = len(items) try: # Add items to queue for item in items: self.pending_queue.put(item) # Check for duplicates if enabled if self.config.deduplicate: items = self._deduplicate_items(items) # Start processing self._process_items(items) finally: self.is_processing = False self.report.end_time = datetime.now() self.report.total_processing_time = ( self.report.end_time - self.report.start_time ).total_seconds() if self.report.processed_items > 0: self.report.avg_processing_time = ( self.report.total_processing_time / self.report.processed_items ) # Save report if configured if self.config.save_report: self._save_report() return self.report def _process_items(self, items: List[BatchItem]): """Process all items in the batch.""" # Chunk items for better resource management chunks = [items[i:i + self.config.chunk_size] for i in range(0, len(items), self.config.chunk_size)] for chunk_idx, chunk in enumerate(chunks): if self.should_stop: break # Check resource availability self._wait_for_resources() # Process chunk futures = [] for item in chunk: if self.should_stop: break future = self.executor.submit(self._process_single_item, item) futures.append((future, item)) # Collect results for future, item in futures: try: result = future.result(timeout=300) # 5 minute timeout item.result = result item.status = "completed" if result else "failed" if result: self.report.successful_items += 1 else: self.report.failed_items += 1 except Exception as e: self.logger.error(f"Processing failed for {item.id}: {e}") item.status = "failed" item.error = str(e) self.report.failed_items += 1 if self.config.stop_on_error: self.should_stop = True break finally: self.report.processed_items += 1 # Progress callback if self.config.progress_callback: progress = self.report.processed_items / self.report.total_items self.config.progress_callback(progress, { 'current_item': item.id, 'processed': self.report.processed_items, 'total': self.report.total_items, 'successful': self.report.successful_items, 'failed': self.report.failed_items }) def _process_single_item(self, item: BatchItem) -> bool: """ Process a single batch item. Args: item: Batch item to process Returns: True if successful """ start_time = time.time() try: # Check cache if self.config.enable_caching: cached_result = self._check_cache(item) if cached_result is not None: self._save_cached_result(item, cached_result) item.processing_time = time.time() - start_time return True # Process based on file type if item.file_type == FileType.IMAGE: success = self._process_image(item) elif item.file_type == FileType.VIDEO: success = self._process_video(item) else: raise ValueError(f"Unsupported file type: {item.file_type}") # Cache result if successful if success and self.config.enable_caching: self._cache_result(item) item.processing_time = time.time() - start_time # Update file size statistics self._update_size_stats(item) return success except Exception as e: self.logger.error(f"Error processing {item.id}: {e}") item.error = str(e) # Retry logic if item.retry_count < item.max_retries: item.retry_count += 1 self.logger.info(f"Retrying {item.id} (attempt {item.retry_count}/{item.max_retries})") return self._process_single_item(item) return False def _process_image(self, item: BatchItem) -> bool: """Process an image file.""" try: # Load image image = cv2.imread(item.input_path) if image is None: raise ValueError(f"Cannot load image: {item.input_path}") # Apply config overrides pipeline_config = self.config.pipeline_config or PipelineConfig() for key, value in item.config_overrides.items(): if hasattr(pipeline_config, key): setattr(pipeline_config, key, value) # Process through pipeline result = self.pipeline.process_image( image, item.background ) if result.success and result.output_image is not None: # Create output directory output_path = Path(item.output_path) output_path.parent.mkdir(parents=True, exist_ok=True) # Save result if output_path.suffix.lower() in ['.jpg', '.jpeg']: cv2.imwrite( str(output_path), result.output_image, [cv2.IMWRITE_JPEG_QUALITY, self.config.image_quality] ) else: cv2.imwrite(str(output_path), result.output_image) # Store quality metrics item.metadata['quality_score'] = result.quality_score self._update_quality_metrics(result.quality_score) return True return False except Exception as e: self.logger.error(f"Image processing failed for {item.input_path}: {e}") raise def _process_video(self, item: BatchItem) -> bool: """Process a video file.""" try: # Create output directory output_path = Path(item.output_path) output_path.parent.mkdir(parents=True, exist_ok=True) # Process video stats = self.video_processor.process_video( item.input_path, str(output_path), item.background ) # Store statistics item.metadata['video_stats'] = { 'frames_processed': stats.frames_processed, 'frames_dropped': stats.frames_dropped, 'processing_fps': stats.processing_fps, 'avg_quality': stats.avg_quality_score } self._update_quality_metrics(stats.avg_quality_score) return stats.frames_processed > 0 except Exception as e: self.logger.error(f"Video processing failed for {item.input_path}: {e}") raise def _detect_file_type(self, file_path: str) -> FileType: """Detect file type from path.""" mime_type, _ = mimetypes.guess_type(file_path) if mime_type: if mime_type.startswith('image/'): return FileType.IMAGE elif mime_type.startswith('video/'): return FileType.VIDEO # Fallback to extension ext = Path(file_path).suffix.lower() if ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp']: return FileType.IMAGE elif ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv']: return FileType.VIDEO return FileType.UNKNOWN def _generate_item_id(self, file_path: Path) -> str: """Generate unique ID for batch item.""" # Combine path and timestamp for uniqueness content = f"{file_path}{time.time()}" return hashlib.md5(content.encode()).hexdigest()[:16] def _deduplicate_items(self, items: List[BatchItem]) -> List[BatchItem]: """Remove duplicate items based on file content hash.""" seen_hashes = set() unique_items = [] for item in items: try: file_hash = self._calculate_file_hash(item.input_path) if file_hash not in seen_hashes: seen_hashes.add(file_hash) unique_items.append(item) else: self.logger.info(f"Skipping duplicate: {item.input_path}") self.report.skipped_items += 1 except Exception as e: self.logger.warning(f"Cannot calculate hash for {item.input_path}: {e}") unique_items.append(item) return unique_items def _calculate_file_hash(self, file_path: str, chunk_size: int = 8192) -> str: """Calculate MD5 hash of file.""" hasher = hashlib.md5() with open(file_path, 'rb') as f: while chunk:= f.read(chunk_size): hasher.update(chunk) return hasher.hexdigest() def _check_cache(self, item: BatchItem) -> Optional[Any]: """Check if item result is cached.""" cache_key = self._get_cache_key(item) cache_file = self.cache_dir / f"{cache_key}.pkl" if cache_file.exists(): try: with open(cache_file, 'rb') as f: cached_data = pickle.load(f) # Verify cache validity if cached_data.get('input_hash') == self._calculate_file_hash(item.input_path): self.logger.info(f"Using cached result for {item.id}") return cached_data['result'] except Exception as e: self.logger.warning(f"Cache read failed: {e}") return None def _cache_result(self, item: BatchItem): """Cache processing result.""" try: cache_key = self._get_cache_key(item) cache_file = self.cache_dir / f"{cache_key}.pkl" # Read processed file with open(item.output_path, 'rb') as f: result_data = f.read() # Cache data cache_data = { 'input_hash': self._calculate_file_hash(item.input_path), 'result': result_data, 'metadata': item.metadata, 'timestamp': time.time() } with open(cache_file, 'wb') as f: pickle.dump(cache_data, f) except Exception as e: self.logger.warning(f"Cache write failed: {e}") def _save_cached_result(self, item: BatchItem, cached_data: bytes): """Save cached result to output file.""" output_path = Path(item.output_path) output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'wb') as f: f.write(cached_data) def _get_cache_key(self, item: BatchItem) -> str: """Generate cache key for item.""" # Include relevant parameters in cache key key_parts = [ item.input_path, str(item.background) if item.background is not None else "none", json.dumps(item.config_overrides, sort_keys=True) ] key_string = "|".join(key_parts) return hashlib.md5(key_string.encode()).hexdigest() def _wait_for_resources(self): """Wait for sufficient resources before processing.""" while True: # Check CPU usage cpu_percent = psutil.cpu_percent(interval=1) if cpu_percent > self.config.cpu_limit_percent: self.logger.debug(f"CPU usage high ({cpu_percent}%), waiting...") time.sleep(2) continue # Check memory memory = psutil.virtual_memory() memory_gb = (memory.total - memory.available) / (1024**3) if memory_gb > self.config.max_memory_gb: self.logger.debug(f"Memory usage high ({memory_gb:.1f}GB), waiting...") time.sleep(2) continue # Resources available break def _update_size_stats(self, item: BatchItem): """Update file size statistics.""" try: input_size = os.path.getsize(item.input_path) / (1024**2) # MB output_size = os.path.getsize(item.output_path) / (1024**2) # MB self.report.total_input_size_mb += input_size self.report.total_output_size_mb += output_size if self.report.total_input_size_mb > 0: self.report.compression_ratio = ( self.report.total_output_size_mb / self.report.total_input_size_mb ) except Exception as e: self.logger.warning(f"Cannot update size stats: {e}") def _update_quality_metrics(self, quality_score: float): """Update quality metrics in report.""" if 'scores' not in self.report.quality_metrics: self.report.quality_metrics['scores'] = [] self.report.quality_metrics['scores'].append(quality_score) scores = self.report.quality_metrics['scores'] self.report.quality_metrics['avg_quality'] = np.mean(scores) self.report.quality_metrics['min_quality'] = np.min(scores) self.report.quality_metrics['max_quality'] = np.max(scores) self.report.quality_metrics['std_quality'] = np.std(scores) def _save_report(self): """Save processing report to file.""" try: report_path = self.config.report_path if not report_path: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_path = f"batch_report_{timestamp}.json" report_dict = { 'start_time': self.report.start_time.isoformat(), 'end_time': self.report.end_time.isoformat() if self.report.end_time else None, 'total_items': self.report.total_items, 'processed_items': self.report.processed_items, 'successful_items': self.report.successful_items, 'failed_items': self.report.failed_items, 'skipped_items': self.report.skipped_items, 'total_processing_time': self.report.total_processing_time, 'avg_processing_time': self.report.avg_processing_time, 'total_input_size_mb': self.report.total_input_size_mb, 'total_output_size_mb': self.report.total_output_size_mb, 'compression_ratio': self.report.compression_ratio, 'quality_metrics': self.report.quality_metrics, 'errors': self.report.errors, 'warnings': self.report.warnings } with open(report_path, 'w') as f: json.dump(report_dict, f, indent=2) self.logger.info(f"Report saved to {report_path}") except Exception as e: self.logger.error(f"Failed to save report: {e}") def process_with_pattern(self, pattern: str, output_template: str, background: Optional[Union[str, np.ndarray]] = None) -> BatchReport: """ Process files matching a pattern with template-based output. Args: pattern: File pattern (e.g., "images/*.jpg") output_template: Output path template (e.g., "output/{name}_bg.{ext}") background: Background for processing Returns: Batch processing report """ items = [] for file_path in Path().glob(pattern): if file_path.is_file(): # Parse template output_path = output_template.format( name=file_path.stem, ext=file_path.suffix[1:], dir=file_path.parent, date=datetime.now().strftime("%Y%m%d") ) item = BatchItem( id=self._generate_item_id(file_path), input_path=str(file_path), output_path=output_path, file_type=self._detect_file_type(str(file_path)), background=background ) items.append(item) return self.process_batch(items) def stop_processing(self): """Stop batch processing.""" self.should_stop = True self.logger.info("Stopping batch processing...") def cleanup(self): """Clean up resources.""" self.stop_processing() self.executor.shutdown(wait=True) # Clean cache if temporary if self.config.cache_dir is None: shutil.rmtree(self.cache_dir, ignore_errors=True) self.logger.info("Batch processor cleanup complete") def get_status(self) -> Dict[str, Any]: """Get current processing status.""" return { 'is_processing': self.is_processing, 'total_items': self.report.total_items, 'processed_items': self.report.processed_items, 'successful_items': self.report.successful_items, 'failed_items': self.report.failed_items, 'skipped_items': self.report.skipped_items, 'current_item': self.current_item.id if self.current_item else None, 'progress': (self.report.processed_items / self.report.total_items * 100 if self.report.total_items > 0 else 0), 'estimated_time_remaining': self.time_estimator.estimate_remaining( self.report.processed_items, self.report.total_items ) if self.is_processing else None }