import argparse import time from pathlib import Path import os import cv2 import torch import torch.backends.cudnn as cudnn from numpy import random import sys import numpy as np from models.experimental import attempt_load from utils.datasets import LoadImages from utils.general import check_img_size, non_max_suppression, scale_coords, set_logging, increment_path from utils.plots import plot_one_box from utils.torch_utils import select_device, time_synchronized import gradio as gr import ffmpeg # IoU and scanner movement functions (unchanged) def compute_iou(box1, box2): x1, y1, x2, y2 = box1 x1_, y1_, x2_, y2_ = box2 xi1 = max(x1, x1_) yi1 = max(y1, y1_) xi2 = min(x2, x2_) yi2 = min(y2, y2_) inter_width = max(0, xi2 - xi1) inter_height = max(0, yi2 - yi1) inter_area = inter_width * inter_height box1_area = (x2 - x1) * (y2 - y1) box2_area = (x2_ - x1_) * (y2_ - y1_) union_area = box1_area + box2_area - inter_area return inter_area / union_area if union_area != 0 else 0.0 def is_scanner_moving(prev_centroids, curr_box, scanner_id, threshold=5.0): x1, y1, x2, y2 = curr_box curr_centroid = ((x1 + x2) / 2, (y1 + y2) / 2) if scanner_id in prev_centroids: prev_x, prev_y = prev_centroids[scanner_id] distance = np.sqrt((curr_centroid[0] - prev_x)**2 + (curr_centroid[1] - prev_y)**2) return distance > threshold return False def detect_video(video_path, weights, conf_thres=0.25, iou_thres=0.45, img_size=640, device='', save_dir='runs/detect/exp'): save_dir = Path(increment_path(Path(save_dir), exist_ok=True)) save_dir.mkdir(parents=True, exist_ok=True) set_logging() device = select_device(device) half = device.type != 'cpu' model = attempt_load(weights, map_location=device) stride = int(model.stride.max()) imgsz = check_img_size(img_size, s=stride) if half: model.half() dataset = LoadImages(video_path, img_size=imgsz, stride=stride) names = model.module.names if hasattr(model, 'module') else model.names colors = [[random.randint(0, 255) for _ in range(3)] for _ in names] vid_path, vid_writer = None, None prev_centroids = {} scanner_id_counter = 0 for path, img, im0s, vid_cap in dataset: img = torch.from_numpy(img).to(device) img = img.half() if half else img.float() img /= 255.0 if img.ndimension() == 3: img = img.unsqueeze(0) with torch.no_grad(): pred = model(img)[0] pred = non_max_suppression(pred, conf_thres, iou_thres) for i, det in enumerate(pred): p = Path(path) save_path = str(save_dir / p.name.replace('.mp4', '_output.mp4')) im0 = im0s if len(det): det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round() item_boxes, scanner_data, phone_boxes = [], [], [] curr_scanner_boxes = [] for *xyxy, conf, cls in det: x1, y1, x2, y2 = map(int, xyxy) class_name = names[int(cls)] color = colors[int(cls)] if class_name.lower() == "item": item_boxes.append([x1, y1, x2, y2]) elif class_name.lower() == "phone": phone_boxes.append([x1, y1, x2, y2]) elif class_name.lower() == "scanner": curr_scanner_boxes.append([x1, y1, x2, y2]) plot_one_box(xyxy, im0, label=class_name, color=color, line_thickness=2) new_prev_centroids = {} if prev_centroids and curr_scanner_boxes: for curr_box in curr_scanner_boxes: curr_centroid = ((curr_box[0] + curr_box[2]) / 2, (curr_box[1] + curr_box[3]) / 2) best_match_id = min(prev_centroids.keys(), key=lambda k: np.sqrt((curr_centroid[0] - prev_centroids[k][0])**2 + (curr_centroid[1] - prev_centroids[k][1])**2), default=None) if best_match_id is not None and np.sqrt((curr_centroid[0] - prev_centroids[best_match_id][0])**2 + (curr_centroid[1] - prev_centroids[best_match_id][1])**2) < 50: scanner_id = best_match_id else: scanner_id = scanner_id_counter scanner_id_counter += 1 is_moving = is_scanner_moving(prev_centroids, curr_box, scanner_id) movement_status = "Scanning" if is_moving else "Idle" scanner_data.append([curr_box, movement_status, scanner_id]) new_prev_centroids[scanner_id] = curr_centroid elif curr_scanner_boxes: for curr_box in curr_scanner_boxes: scanner_id = scanner_id_counter scanner_id_counter += 1 movement_status = "Idle" curr_centroid = ((curr_box[0] + curr_box[2]) / 2, (curr_box[1] + curr_box[3]) / 2) scanner_data.append([curr_box, movement_status, scanner_id]) new_prev_centroids[scanner_id] = curr_centroid prev_centroids = new_prev_centroids for scanner_box, movement_status, scanner_id in scanner_data: x1, y1, x2, y2 = scanner_box label = f"scanner {movement_status} (ID: {scanner_id})" plot_one_box([x1, y1, x2, y2], im0, label=label, color=colors[names.index("scanner")], line_thickness=2) product_scanning_status = "" payment_scanning_status = "" for scanner_box, movement_status, _ in scanner_data: for item_box in item_boxes: if movement_status == "Scanning" and compute_iou(scanner_box, item_box) > 0.1: product_scanning_status = "Product scanning is finished" for phone_box in phone_boxes: if movement_status == "Scanning" and compute_iou(scanner_box, phone_box) > 0.1: payment_scanning_status = "Payment scanning is finished" if product_scanning_status: cv2.putText(im0, product_scanning_status, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, colors[names.index("scanner")], 2) if payment_scanning_status: cv2.putText(im0, payment_scanning_status, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.9, colors[names.index("scanner")], 2) if vid_path != save_path: vid_path = save_path if isinstance(vid_writer, cv2.VideoWriter): vid_writer.release() fps = vid_cap.get(cv2.CAP_PROP_FPS) if vid_cap else 30 w, h = im0.shape[1], im0.shape[0] vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) vid_writer.write(im0) if isinstance(vid_writer, cv2.VideoWriter): vid_writer.release() # Convert to H.264 for browser compatibility output_h264 = str(Path(save_path).with_name(f"{Path(save_path).stem}_h264.mp4")) try: stream = ffmpeg.input(save_path) stream = ffmpeg.output(stream, output_h264, vcodec='libx264', acodec='aac', format='mp4', pix_fmt='yuv420p') ffmpeg.run(stream, overwrite_output=True) os.remove(save_path) # Remove original return output_h264 except ffmpeg.Error as e: print(f"FFmpeg error: {e.stderr.decode()}") return save_path def gradio_interface(video, conf_thres, iou_thres): weights = "best.pt" img_size = 640 output_video = detect_video(video, weights, conf_thres, iou_thres, img_size) return output_video if output_video else "Error processing video." interface = gr.Interface( fn=gradio_interface, inputs=[ gr.Video(label="Upload Video"), gr.Slider(0, 1, value=0.25, step=0.05, label="Confidence Threshold"), gr.Slider(0, 1, value=0.45, step=0.05, label="IoU Threshold"), ], outputs=gr.Video(label="Processed Video"), title="Retail Shop Monitoring System", # description="Upload a video to run YOLO detection with custom parameters." ) interface.launch() app = FastAPI() app = gr.mount_gradio_app(app, interface, path="/") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)