Spaces:
Paused
Paused
| import cv2 | |
| import numpy as np | |
| from gradio_utils.flow_utils import bivariate_Gaussian | |
| OBJECT_MOTION_MODE = ["Provided Trajectory", "Custom Trajectory"] | |
| PROVIDED_TRAJS = { | |
| "horizon_1": "examples/trajectories/horizon_2.txt", | |
| "swaying_1": "examples/trajectories/shake_1.txt", | |
| "swaying_2": "examples/trajectories/shake_2.txt", | |
| "swaying_3": "examples/trajectories/shaking_10.txt", | |
| "curve_1": "examples/trajectories/curve_1.txt", | |
| "curve_2": "examples/trajectories/curve_2.txt", | |
| "curve_3": "examples/trajectories/curve_3.txt", | |
| "curve_4": "examples/trajectories/curve_4.txt", | |
| } | |
| def read_points(file, video_len=16, reverse=False): | |
| with open(file, 'r') as f: | |
| lines = f.readlines() | |
| points = [] | |
| for line in lines: | |
| x, y = line.strip().split(',') | |
| points.append((int(x), int(y))) | |
| if reverse: | |
| points = points[::-1] | |
| if len(points) > video_len: | |
| skip = len(points) // video_len | |
| points = points[::skip] | |
| points = points[:video_len] | |
| return points | |
| def get_provided_traj(traj_name): | |
| traj = read_points(PROVIDED_TRAJS[traj_name]) | |
| # xrange from 256 to 1024 | |
| traj = [[int(1024*x/256), int(1024*y/256)] for x,y in traj] | |
| return traj | |
| blur_kernel = bivariate_Gaussian(99, 10, 10, 0, grid=None, isotropic=True) | |
| def process_points(points): | |
| frames = 16 | |
| defualt_points = [[512,512]]*16 | |
| if len(points) < 2: | |
| return defualt_points | |
| elif len(points) >= frames: | |
| skip = len(points)//frames | |
| return points[::skip][:15] + points[-1:] | |
| else: | |
| insert_num = frames - len(points) | |
| insert_num_dict = {} | |
| interval = len(points) - 1 | |
| n = insert_num // interval | |
| m = insert_num % interval | |
| for i in range(interval): | |
| insert_num_dict[i] = n | |
| for i in range(m): | |
| insert_num_dict[i] += 1 | |
| res = [] | |
| for i in range(interval): | |
| insert_points = [] | |
| x0,y0 = points[i] | |
| x1,y1 = points[i+1] | |
| delta_x = x1 - x0 | |
| delta_y = y1 - y0 | |
| for j in range(insert_num_dict[i]): | |
| x = x0 + (j+1)/(insert_num_dict[i]+1)*delta_x | |
| y = y0 + (j+1)/(insert_num_dict[i]+1)*delta_y | |
| insert_points.append([int(x), int(y)]) | |
| res += points[i:i+1] + insert_points | |
| res += points[-1:] | |
| return res | |
| def get_flow(points, video_len=16): | |
| optical_flow = np.zeros((video_len, 256, 256, 2), dtype=np.float32) | |
| for i in range(video_len-1): | |
| p = points[i] | |
| p1 = points[i+1] | |
| optical_flow[i+1, p[1], p[0], 0] = p1[0] - p[0] | |
| optical_flow[i+1, p[1], p[0], 1] = p1[1] - p[1] | |
| for i in range(1, video_len): | |
| optical_flow[i] = cv2.filter2D(optical_flow[i], -1, blur_kernel) | |
| return optical_flow | |
| def process_traj(points, device='cpu'): | |
| xy_range = 1024 | |
| points = process_points(points) | |
| points = [[int(256*x/xy_range), int(256*y/xy_range)] for x,y in points] | |
| optical_flow = get_flow(points) | |
| # optical_flow = torch.tensor(optical_flow).to(device) | |
| return optical_flow |