# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import glob import io import os from os.path import join, isdir, isfile import zipfile import imageio.v3 as imageio import json import numpy as np import pyexr from tqdm import tqdm from api_types import CompressedSeedingRequest, SeedingRequest from encoding import CompressionFormat def srgb_to_linear(img): limit = 0.04045 mask = img > limit # Process the two cases in parallel using NumPy's vectorized operations result = np.empty_like(img) result[mask] = np.power((img[mask] + 0.055) / 1.055, 2.4) result[~mask] = img[~mask] / 12.92 return result def load_gen3c_seeding_data(data_directory: str, max_frames: int | None = None, frames_stride: int = 1) -> CompressedSeedingRequest: """ Example directory structure: ├── camera.npz ├── depth.npz ├── mask.npz ├── metadata.json └── rgb.mp4 We will keep the data compressed as much as possible so that it can be uploaded faster to the inference server. """ bar = tqdm(range(6), desc="Seeding data loading") # [n_frames, height, width], float16 depths = np.load(join(data_directory, "depth.npz"))['depth'] assert depths.ndim == 3, depths.shape n_img = depths.shape[0] resolutions = np.tile([depths.shape[2], depths.shape[1]], reps=(n_img, 1)) assert resolutions.shape == (n_img, 2) with io.BytesIO() as f: np.savez_compressed(f, depths) depths_compressed = f.getvalue() bar.update(1) # Intrinsics: [n_frames, 3, 3], float32 # Organized as: # [[fx, 0, cx], # [ 0, fy, cy], # [ 0, 0, 1]] camera_data = np.load(join(data_directory, "camera.npz")) intrinsics = camera_data['intrinsics'] # Absolute focal lengths focal_lengths = np.stack([intrinsics[:, 0, 0], intrinsics[:, 1, 1]], axis=1) assert focal_lengths.shape == (n_img, 2) # Relative principal points principal_points = (intrinsics[:, :2, 2] / resolutions).astype(np.float32) assert principal_points.shape == (n_img, 2) bar.update(1) # [n_frames, height, width], bool masks = np.load(join(data_directory, "mask.npz"))['mask'] with io.BytesIO() as f: np.savez_compressed(f, masks) masks_compressed = f.getvalue() bar.update(1) # TODO: set the frontend's FPS slider based on `metadata["fps"]` # metadata = json.load(open(join(data_directory, "metadata.json"))) bar.update(1) images_compressed = open(join(data_directory, "rgb.mp4"), "rb").read() bar.update(1) # [n_frames, 4, 4], float32 w2c = camera_data['w2c'] cameras_to_world = np.linalg.inv(w2c)[:, :3, :] assert cameras_to_world.shape == (n_img, 3, 4) bar.update(1) return CompressedSeedingRequest( request_id="__seeding_from_files", images=None, # Will be auto-filled with placeholders depths=None, # Will be auto-filled with placeholders masks=None, # Will be auto-filled with placeholders cameras_to_world=cameras_to_world, focal_lengths=focal_lengths, principal_points=principal_points, resolutions=resolutions, images_compressed=[images_compressed], images_format=CompressionFormat.MP4, depths_compressed=[depths_compressed], depths_format=CompressionFormat.NPZ, masks_compressed=[masks_compressed], masks_format=CompressionFormat.NPZ, ) def load_v2v_seeding_data(data_directory: str, max_frames: int | None = None, frames_stride: int = 1) -> SeedingRequest: """ The seeding data would typically come from the client. For convenience during debugging, we allow loading it here. """ if isdir(data_directory): # --- Load seeding data from a directory. if isfile(join(data_directory, "rgb.mp4")) and isfile(join(data_directory, "metadata.json")): return load_gen3c_seeding_data(data_directory, max_frames=max_frames, frames_stride=frames_stride) # Gen3C / INGP pre-processed format. # We assume depths, camera poses, etc are included. # Load the seeding frames n_img = len([img for img in sorted(os.listdir(join(data_directory, 'rgb'))) if img.endswith('.jpg')]) images = [] depths = [] for i_frame in range(n_img): # Load image data image = imageio.imread(join(data_directory, 'rgb', f'{i_frame:05d}.jpg')) image_np = image.astype(np.float32) / 255.0 # Load depth data depth_np = np.load(join(data_directory, 'depth', f'{i_frame:05d}.npz'))['depth'] images.append(image_np) depths.append(depth_np) del image_np, depth_np # Load camera trajectory with open(join(data_directory, 'cameras.json'), 'r') as f: cameras = json.load(f) cameras_to_world = np.asarray(cameras)[:n_img] if (max_frames is not None) and (max_frames < len(images)): images = images[::frames_stride][:max_frames] depths = depths[::frames_stride][:max_frames] cameras_to_world = cameras_to_world[::frames_stride][:max_frames] else: # --- Load a single image. # We will have to assume camera poses, etc and let depth be auto-estimated. n_img = 1 image = imageio.imread(data_directory) images = [image.astype(np.float32) / 255.0] depths = None cameras_to_world = np.eye(4)[None, :3, :] # Shape: [batch, height, width, 3] images = np.stack(images, axis=0) if depths is not None: # Shape: [batch, height, width] depths = np.stack(depths, axis=0) # Note: assumed based on how this data was generated resolutions = np.tile([images.shape[2], images.shape[1]], reps=(n_img, 1)) fov_y_rad = np.pi * (50.625 / 180.0) f = 0.5 / (np.tan(fov_y_rad / 2.0)) * resolutions[:, 1] focal_lengths = np.stack([f, f], axis=-1) principal_points = np.full((n_img, 2), 0.5) return SeedingRequest( request_id="__seeding_from_files", images=images, depths=depths, cameras_to_world=cameras_to_world, focal_lengths=focal_lengths, principal_points=principal_points, resolutions=resolutions, ) def ensure_alpha_channel(image: np.ndarray): # Allow alpha channel to be omitted for faster transfers assert image.shape[-1] in (3, 4) if image.shape[-1] == 3: image = np.concatenate([image, np.ones((*image.shape[:2], 1))], axis=-1) image = image.astype(np.float32) return image def apply_to_pytree(pytree, cb): tp = type(pytree) if pytree is None: return None elif isinstance(pytree, (tuple, list)): return tp([apply_to_pytree(v, cb) for v in pytree]) elif isinstance(pytree, dict): return { k: apply_to_pytree(v, cb) for k, v in pytree.items() } else: return cb(pytree) def move_to_device(pytree, device): import torch def move(pytree): if torch.is_tensor(pytree): return pytree.to(device) elif isinstance(pytree, np.ndarray): return torch.from_numpy(pytree).to(device) else: # Let's assume it's a not something we need to move return pytree # raise NotImplementedError(f"move_to_device(): unsupported type {type(pytree)}") return apply_to_pytree(pytree, move) def clone_tensors(pytree): import torch def clone(pytree): if torch.is_tensor(pytree): return pytree.clone() elif isinstance(pytree, np.ndarray): return pytree.copy() else: # Let's assume it's a not something we need to copy return pytree # raise NotImplementedError(f"clone_tensors(): unsupported type {type(pytree)}") return apply_to_pytree(pytree, clone)