Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| import yaml | |
| from contour import Contour | |
| class Clipper(): | |
| def __init__(self): # TODO: put this in config | |
| self.bg = None | |
| self.seen_fish = False | |
| self.vid_num = 1 | |
| self.load_config() | |
| def load_config(self): | |
| with open("config.yaml") as params: | |
| config = yaml.safe_load(params) | |
| self.MAX_CHUNK_SIZE = config.get("DST_VID_MAX_SECS") * config.get("DST_VID_FPS") | |
| k_size = config.get("KERNEL_BLUR_PX") | |
| self.BLUR_KERNEL = np.ones((k_size,k_size),np.uint8) | |
| self.MIN_RAD = config.get("MIN_RADIUS_CONTOUR_PX") | |
| self.cap = cv2.VideoCapture(config.get("SRC_VID_FP")) | |
| self.dst_name = config.get("DST_VID_NAME") | |
| self.dst_ftype = config.get("DST_VID_FILETYPE") | |
| self.dst_vid_fp = config.get("DST_VID_FP") | |
| self.dst_fps = config.get("DST_VID_FPS") | |
| self.show_cnt = config.get("SHOW_CONTOUR") | |
| def clip_vid(self): | |
| """" | |
| Truncates long video files into segments and saves them. | |
| This prevents OutOfMemory exceptions caused by long videos. | |
| """ | |
| running = True | |
| while running is True: | |
| ret, frame = self.cap.read() | |
| if not ret: | |
| print(ret) | |
| running = False | |
| else: | |
| if self.bg is None: | |
| self.bg = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) | |
| cnt_list = Clipper.detect_contours(frame, self.bg, self.BLUR_KERNEL, self.MIN_RAD) | |
| if len(cnt_list) > 0: | |
| self.write_vid(cnt_list) | |
| def write_vid(self, cnt_list): | |
| fourcc = cv2.VideoWriter_fourcc('F', 'M', 'P', '4') | |
| video = cv2.VideoWriter(self.dst_vid_fp + self.dst_name + str(self.vid_num) + self.dst_ftype, fourcc, self.dst_fps, (int(self.cap.get(3)), int(self.cap.get(4)))) | |
| frame_cnt = 0 | |
| while True: | |
| ret, frame = self.cap.read() | |
| if self.show_cnt: | |
| for cnt in cnt_list: | |
| frame = cv2.circle(frame, (cnt.x,cnt.y), cnt.rad, (0, 0, 255), 1) | |
| video.write(frame) | |
| frame_cnt += 1 | |
| cnt_list = Clipper.detect_contours(frame, self.bg, self.BLUR_KERNEL, self.MIN_RAD) | |
| if (len(cnt_list) <= 0) or (frame_cnt > self.MAX_CHUNK_SIZE): | |
| video.release() | |
| self.vid_num += 1 | |
| break | |
| def detect_contours(frame, bg, kernel, min_rad): | |
| # Use HSV colorspace to minimize lighting variance | |
| img = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) | |
| # Take absolute value of difference and blur to denoise | |
| difference = img - bg | |
| greyscale = cv2.cvtColor(difference, cv2.COLOR_BGR2GRAY) | |
| ret, thresh = cv2.threshold(greyscale, 150, 255, cv2.THRESH_BINARY) | |
| erosion = cv2.erode(thresh, kernel, iterations = 1) | |
| cnt_list = [] | |
| # Find contours > 15 px in diameter | |
| contours, hierarchy = cv2.findContours(image=erosion, mode=cv2.RETR_TREE, method=cv2.CHAIN_APPROX_NONE) | |
| for cnt in contours: | |
| (x,y), radius = cv2.minEnclosingCircle(cnt) | |
| if int(radius) > min_rad: # TODO Figure out smallest size of herring | |
| cnt_list.append(Contour(int(x),int(y),int(radius))) | |
| return cnt_list | |
| if __name__ == "__main__": | |
| # Script below to enable running pure inference from command line | |
| c = Clipper() | |
| c.clip_vid() | |