| |
| """ |
| utils.compositing |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| Handles frame-level compositing (foreground frame + mask + background). |
| Public API |
| ---------- |
| replace_background_hq(frame_bgr, mask, background_bgr, fallback_enabled=True) β np.ndarray |
| """ |
| from __future__ import annotations |
| import logging |
| from typing import Tuple |
| import cv2 |
| import numpy as np |
|
|
| log = logging.getLogger(__name__) |
|
|
| |
| class BackgroundReplacementError(Exception): |
| """Exception raised for background replacement errors""" |
| pass |
|
|
| __all__ = ["replace_background_hq", "BackgroundReplacementError"] |
|
|
| |
| |
| |
| def replace_background_hq( |
| frame_bgr: np.ndarray, |
| mask: np.ndarray, |
| background_bgr: np.ndarray, |
| fallback_enabled: bool = True, |
| ) -> np.ndarray: |
| """ |
| β’ Ensures background is resized to frame |
| β’ Accepts mask in {0,1} or {0,255} or float32 |
| β’ Tries edge-feathered advanced blend, else simple overlay |
| """ |
| if frame_bgr is None or mask is None or background_bgr is None: |
| raise ValueError("Invalid input to replace_background_hq") |
| |
| h, w = frame_bgr.shape[:2] |
| background = cv2.resize(background_bgr, (w, h), interpolation=cv2.INTER_LANCZOS4) |
| m = _process_mask(mask) |
| |
| try: |
| return _advanced_composite(frame_bgr, background, m) |
| except Exception as e: |
| log.warning(f"Advanced compositing failed: {e}") |
| if not fallback_enabled: |
| raise |
| return _simple_composite(frame_bgr, background, m) |
|
|
| |
| |
| |
| def _advanced_composite(fg, bg, mask_u8): |
| |
| mask = cv2.GaussianBlur(mask_u8.astype(np.float32), (5, 5), 1.0) / 255.0 |
| mask = np.power(mask, 0.8) |
| mask3 = mask[..., None] |
| |
| |
| fg_adj = _colour_match_edges(fg, bg, mask) |
| |
| |
| comp = fg_adj.astype(np.float32) * mask3 + bg.astype(np.float32) * (1 - mask3) |
| return np.clip(comp, 0, 255).astype(np.uint8) |
|
|
| def _colour_match_edges(fg, bg, alpha): |
| edge = cv2.Sobel(alpha, cv2.CV_32F, 1, 1, ksize=3) |
| edge = (np.abs(edge) > 0.05).astype(np.float32) |
| |
| if not np.any(edge): |
| return fg |
| |
| adj = fg.astype(np.float32).copy() |
| mix = 0.1 |
| adj[edge > 0] = adj[edge > 0] * (1 - mix) + bg[edge > 0] * mix |
| return adj.astype(np.uint8) |
|
|
| |
| |
| |
| def _simple_composite(fg, bg, mask_u8): |
| m = mask_u8.astype(np.float32) / 255.0 |
| m3 = m[..., None] |
| return (fg.astype(np.float32) * m3 + bg.astype(np.float32) * (1 - m3)).astype(np.uint8) |
|
|
| |
| |
| |
| def _process_mask(mask): |
| """Ensure uint8 0/255 single-channel""" |
| if mask.ndim == 3: |
| mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY) |
| |
| if mask.dtype != np.uint8: |
| mask = (mask * 255).astype(np.uint8) if mask.max() <= 1 else mask.astype(np.uint8) |
| |
| _, mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY) |
| return mask |