|
|
|
|
|
|
|
import cv2 |
|
import numpy as np |
|
from tqdm import tqdm |
|
import pycolmap |
|
import os |
|
import time |
|
import tempfile |
|
from moviepy import VideoFileClip |
|
from matplotlib import pyplot as plt |
|
from PIL import Image |
|
import cv2 |
|
from tqdm import tqdm |
|
|
|
WORKDIR = "../outputs/" |
|
|
|
|
|
def get_rotation_moviepy(video_path): |
|
clip = VideoFileClip(video_path) |
|
rotation = 0 |
|
|
|
try: |
|
displaymatrix = clip.reader.infos['inputs'][0]['streams'][2]['metadata'].get('displaymatrix', '') |
|
if 'rotation of' in displaymatrix: |
|
angle = float(displaymatrix.strip().split('rotation of')[-1].split('degrees')[0]) |
|
rotation = int(angle) % 360 |
|
|
|
except Exception as e: |
|
print(f"No displaymatrix rotation found: {e}") |
|
|
|
clip.reader.close() |
|
|
|
|
|
|
|
return rotation |
|
|
|
def resize_max_side(frame, max_size): |
|
h, w = frame.shape[:2] |
|
scale = max_size / max(h, w) |
|
if scale < 1: |
|
frame = cv2.resize(frame, (int(w * scale), int(h * scale))) |
|
return frame |
|
|
|
def read_video_frames(video_input, k=1, max_size=1024): |
|
""" |
|
Extracts every k-th frame from a video or list of images, resizes to max size, and returns frames as list. |
|
|
|
Parameters: |
|
video_input (str, file-like, or list): Path to video file, file-like object, or list of image files. |
|
k (int): Interval for frame extraction (every k-th frame). |
|
max_size (int): Maximum size for width or height after resizing. |
|
|
|
Returns: |
|
frames (list): List of resized frames (numpy arrays). |
|
""" |
|
|
|
if isinstance(video_input, list): |
|
|
|
if len(video_input) == 1 and video_input[0].name.endswith(('.mp4', '.avi', '.mov')): |
|
video_input = video_input[0] |
|
else: |
|
|
|
frames = [] |
|
for img_file in video_input: |
|
img = Image.open(img_file.name).convert("RGB") |
|
img.thumbnail((max_size, max_size)) |
|
frames.append(np.array(img)[...,::-1]) |
|
return frames |
|
|
|
|
|
if hasattr(video_input, 'name'): |
|
video_path = video_input.name |
|
elif isinstance(video_input, (str, os.PathLike)): |
|
video_path = str(video_input) |
|
else: |
|
raise ValueError("Unsupported video input type. Must be a filepath, file-like object, or list of images.") |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
raise ValueError(f"Error: Could not open video {video_path}.") |
|
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
frame_count = 0 |
|
frames = [] |
|
|
|
with tqdm(total=total_frames // k, desc="Processing Video", unit="frame") as pbar: |
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
if frame_count % k == 0: |
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
h, w = frame.shape[:2] |
|
scale = max(h, w) / max_size |
|
if scale > 1: |
|
frame = cv2.resize(frame, (int(w / scale), int(h / scale))) |
|
frames.append(frame[...,[2,1,0]]) |
|
pbar.update(1) |
|
frame_count += 1 |
|
|
|
cap.release() |
|
return frames |
|
|
|
def resize_max_side(frame, max_size): |
|
""" |
|
Resizes the frame so that its largest side equals max_size, maintaining aspect ratio. |
|
""" |
|
height, width = frame.shape[:2] |
|
max_dim = max(height, width) |
|
|
|
if max_dim <= max_size: |
|
return frame |
|
|
|
scale = max_size / max_dim |
|
new_width = int(width * scale) |
|
new_height = int(height * scale) |
|
|
|
resized_frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA) |
|
return resized_frame |
|
|
|
|
|
|
|
def variance_of_laplacian(image): |
|
|
|
|
|
return cv2.Laplacian(image, cv2.CV_64F).var() |
|
|
|
def process_all_frames(IMG_FOLDER = '/scratch/datasets/hq_data/night2_all_frames', |
|
to_visualize=False, |
|
save_images=True): |
|
dict_scores = {} |
|
for idx, img_name in tqdm(enumerate(sorted([x for x in os.listdir(IMG_FOLDER) if '.png' in x]))): |
|
|
|
img = cv2.imread(os.path.join(IMG_FOLDER, img_name)) |
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
|
fm = variance_of_laplacian(gray) + \ |
|
variance_of_laplacian(cv2.resize(gray, (0,0), fx=0.75, fy=0.75)) + \ |
|
variance_of_laplacian(cv2.resize(gray, (0,0), fx=0.5, fy=0.5)) + \ |
|
variance_of_laplacian(cv2.resize(gray, (0,0), fx=0.25, fy=0.25)) |
|
if to_visualize: |
|
plt.figure() |
|
plt.title(f"Laplacian score: {fm:.2f}") |
|
plt.imshow(img[..., [2,1,0]]) |
|
plt.show() |
|
dict_scores[idx] = {"idx" : idx, |
|
"img_name" : img_name, |
|
"score" : fm} |
|
if save_images: |
|
dict_scores[idx]["img"] = img |
|
|
|
return dict_scores |
|
|
|
def select_optimal_frames(scores, k): |
|
""" |
|
Selects a minimal subset of frames while ensuring no gaps exceed k. |
|
|
|
Args: |
|
scores (list of float): List of scores where index represents frame number. |
|
k (int): Maximum allowed gap between selected frames. |
|
|
|
Returns: |
|
list of int: Indices of selected frames. |
|
""" |
|
n = len(scores) |
|
selected = [0, n-1] |
|
i = 0 |
|
|
|
while i < n: |
|
|
|
best_idx = max(range(i, min(i + k + 1, n)), key=lambda x: scores[x], default=None) |
|
|
|
if best_idx is None: |
|
break |
|
|
|
selected.append(best_idx) |
|
i = best_idx + k + 1 |
|
|
|
return sorted(selected) |
|
|
|
|
|
def variance_of_laplacian(image): |
|
""" |
|
Compute the variance of Laplacian as a focus measure. |
|
""" |
|
return cv2.Laplacian(image, cv2.CV_64F).var() |
|
|
|
def preprocess_frames(frames, verbose=False): |
|
""" |
|
Compute sharpness scores for a list of frames using multi-scale Laplacian variance. |
|
|
|
Args: |
|
frames (list of np.ndarray): List of frames (BGR images). |
|
verbose (bool): If True, print scores. |
|
|
|
Returns: |
|
list of float: Sharpness scores for each frame. |
|
""" |
|
scores = [] |
|
|
|
for idx, frame in enumerate(tqdm(frames, desc="Scoring frames")): |
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
|
fm = ( |
|
variance_of_laplacian(gray) + |
|
variance_of_laplacian(cv2.resize(gray, (0, 0), fx=0.75, fy=0.75)) + |
|
variance_of_laplacian(cv2.resize(gray, (0, 0), fx=0.5, fy=0.5)) + |
|
variance_of_laplacian(cv2.resize(gray, (0, 0), fx=0.25, fy=0.25)) |
|
) |
|
|
|
if verbose: |
|
print(f"Frame {idx}: Sharpness Score = {fm:.2f}") |
|
|
|
scores.append(fm) |
|
|
|
return scores |
|
|
|
def select_optimal_frames(scores, k): |
|
""" |
|
Selects k frames by splitting into k segments and picking the sharpest frame from each. |
|
|
|
Args: |
|
scores (list of float): List of sharpness scores. |
|
k (int): Number of frames to select. |
|
|
|
Returns: |
|
list of int: Indices of selected frames. |
|
""" |
|
n = len(scores) |
|
selected_indices = [] |
|
segment_size = n // k |
|
|
|
for i in range(k): |
|
start = i * segment_size |
|
end = (i + 1) * segment_size if i < k - 1 else n |
|
segment_scores = scores[start:end] |
|
|
|
if len(segment_scores) == 0: |
|
continue |
|
|
|
best_in_segment = start + np.argmax(segment_scores) |
|
selected_indices.append(best_in_segment) |
|
|
|
return sorted(selected_indices) |
|
|
|
def save_frames_to_scene_dir(frames, scene_dir): |
|
""" |
|
Saves a list of frames into the target scene directory under 'images/' subfolder. |
|
|
|
Args: |
|
frames (list of np.ndarray): List of frames (BGR images) to save. |
|
scene_dir (str): Target path where 'images/' subfolder will be created. |
|
""" |
|
images_dir = os.path.join(scene_dir, "images") |
|
os.makedirs(images_dir, exist_ok=True) |
|
|
|
for idx, frame in enumerate(frames): |
|
filename = os.path.join(images_dir, f"{idx:08d}.png") |
|
cv2.imwrite(filename, frame) |
|
|
|
print(f"Saved {len(frames)} frames to {images_dir}") |
|
|
|
|
|
def run_colmap_on_scene(scene_dir): |
|
""" |
|
Runs feature extraction, matching, and mapping on all images inside scene_dir/images using pycolmap. |
|
|
|
Args: |
|
scene_dir (str): Path to scene directory containing 'images' folder. |
|
|
|
TODO: if the function hasn't managed to match all the frames either increase image size, |
|
increase number of features or just remove those frames from the folder scene_dir/images |
|
""" |
|
start_time = time.time() |
|
print(f"Running COLMAP pipeline on all images inside {scene_dir}") |
|
|
|
|
|
database_path = os.path.join(scene_dir, "database.db") |
|
sparse_path = os.path.join(scene_dir, "sparse") |
|
image_dir = os.path.join(scene_dir, "images") |
|
|
|
|
|
os.makedirs(sparse_path, exist_ok=True) |
|
|
|
|
|
pycolmap.extract_features( |
|
database_path, |
|
image_dir, |
|
sift_options={ |
|
"max_num_features": 512 * 2, |
|
"max_image_size": 512 * 1, |
|
} |
|
) |
|
print(f"Finished feature extraction in {(time.time() - start_time):.2f}s.") |
|
|
|
|
|
pycolmap.match_exhaustive(database_path) |
|
print(f"Finished feature matching in {(time.time() - start_time):.2f}s.") |
|
|
|
|
|
pipeline_options = pycolmap.IncrementalPipelineOptions() |
|
pipeline_options.min_num_matches = 15 |
|
pipeline_options.multiple_models = True |
|
pipeline_options.max_num_models = 50 |
|
pipeline_options.max_model_overlap = 20 |
|
pipeline_options.min_model_size = 10 |
|
pipeline_options.extract_colors = True |
|
pipeline_options.num_threads = 8 |
|
pipeline_options.mapper.init_min_num_inliers = 30 |
|
pipeline_options.mapper.init_max_error = 8.0 |
|
pipeline_options.mapper.init_min_tri_angle = 5.0 |
|
|
|
reconstruction = pycolmap.incremental_mapping( |
|
database_path=database_path, |
|
image_path=image_dir, |
|
output_path=sparse_path, |
|
options=pipeline_options, |
|
) |
|
print(f"Finished incremental mapping in {(time.time() - start_time):.2f}s.") |
|
|
|
|
|
recon_path = os.path.join(sparse_path, "0") |
|
reconstruction = pycolmap.Reconstruction(recon_path) |
|
|
|
for cam in reconstruction.cameras.values(): |
|
cam.model = 'SIMPLE_PINHOLE' |
|
cam.params = cam.params[:3] |
|
|
|
reconstruction.write(recon_path) |
|
|
|
print(f"Total pipeline time: {(time.time() - start_time):.2f}s.") |
|
|
|
|