Spaces:
Running
on
Zero
Running
on
Zero
File size: 33,047 Bytes
1f76612 8c4a54c 4012fb1 8c4a54c 4012fb1 61455bf 8c4a54c 4012fb1 8c4a54c 4012fb1 8c4a54c 4012fb1 8c4a54c a01b264 447adea a01b264 447adea 3a251d5 447adea 50db534 4791a72 447adea a01b264 a0d45ba a01b264 447adea a01b264 447adea a01b264 447adea 4012fb1 447adea a01b264 447adea a01b264 447adea a01b264 447adea 1f76612 8faa71f 447adea 50db534 447adea 50db534 447adea a01b264 447adea a0d45ba 3a251d5 4791a72 3a251d5 4791a72 3a251d5 4791a72 3a251d5 a0d45ba 3a251d5 4791a72 3a251d5 4791a72 3a251d5 4791a72 3a251d5 4791a72 3a251d5 4791a72 a0d45ba 4791a72 3a251d5 447adea a01b264 447adea a01b264 447adea a01b264 447adea a01b264 447adea 3a251d5 447adea 07feaa3 447adea 07feaa3 447adea 5f8f9fa 3a251d5 a0d45ba 5f8f9fa 447adea 3a251d5 447adea 4791a72 a01b264 447adea ad58b10 447adea a01b264 447adea a01b264 447adea a01b264 447adea a01b264 447adea 4791a72 447adea a01b264 447adea a0d45ba a01b264 447adea a01b264 447adea 4791a72 447adea 4791a72 447adea 4791a72 447adea 4791a72 447adea a0d45ba 3a251d5 4791a72 3a251d5 4791a72 a0d45ba 3a251d5 4791a72 3a251d5 4791a72 3a251d5 447adea 4791a72 447adea a01b264 8755604 ce98ad0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 |
import spaces
import importlib
import importlib.util
import subprocess
import sys
import os
def ensure_mmpose_installed(local_path='/data/wheelhouse/mmpose-0.24.0-py2.py3-none-any.whl'):
"""
Check if 'mmpose' can be imported; if not, attempt to install it from local_path.
local_path should contain setup.py or pyproject.toml so that pip can install it.
"""
package_name = 'mmpose'
# Try to find the spec for import
spec = importlib.util.find_spec(package_name)
if spec is not None:
try:
module = importlib.import_module(package_name)
print(f"'{package_name}' is already installed, version: {getattr(module, '__version__', 'unknown')}")
return True
except Exception as e:
print(f"Found '{package_name}', but import failed: {e}. Will attempt re-install.")
else:
print(f"'{package_name}' not found, attempting installation...")
# If we reach here, we need to install or reinstall
# Check that the directory exists
#if not os.path.isdir(local_path):
# raise FileNotFoundError(f"Specified install directory does not exist: {local_path}")
# Construct pip install command using the current Python executable
cmd = [sys.executable, "-m", "pip", "install", local_path]
print("Running command:", " ".join(cmd))
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to install mmpose: {e}")
# After installation, try importing again
try:
module = importlib.import_module(package_name)
print(f"'{package_name}' installed and imported successfully, version: {getattr(module, '__version__', 'unknown')}")
return True
except Exception as e:
raise RuntimeError(f"Installed but still cannot import '{package_name}': {e}")
ensure_mmpose_installed('/data/wheelhouse/mmpose-0.24.0-py2.py3-none-any.whl')
import gradio as gr
import numpy as np
import cv2
import json
import subprocess
import os
from typing import Tuple, List, Dict, Any
import tempfile
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoProcessor
import torch
from PIL import Image
import datetime
import uuid
from PIL import Image
import io, base64
from datasets import load_dataset, Dataset
from huggingface_hub import HfApi, Repository, upload_file
sys.path.append("/data/WHAM")
from demo import wham_execute
HF_DATASET_ID = "qihfang/sportscoaching"
# Use HuggingFace remote inference
try:
from huggingface_hub import InferenceClient
except ImportError:
InferenceClient = None
def ensure_compatible_video_and_show(path):
# 验证是否是有效视频(用 ffmpeg probe)
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=format_name", "-of", "default=noprint_wrappers=1:nokey=1", path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
text=True
)
format_name = result.stdout.strip()
if not format_name: # 空结果说明不是视频
raise ValueError("不是有效视频文件")
except Exception:
raise gr.Error("上传的不是有效视频文件,请上传 .mp4、.avi、.webm 等格式的视频。")
# 转码逻辑
ext = os.path.splitext(path)[1].lower()
if ext not in [".mp4", ".webm"]:
out_dir = tempfile.mkdtemp()
target = os.path.join(out_dir, "converted.mp4")
subprocess.run([
"ffmpeg", "-y", "-i", path,
"-c:v", "libx264", "-preset", "fast",
"-c:a", "aac", "-movflags", "+faststart",
target
], check=True)
return gr.update(value=target, visible=True)
return gr.update(value=path, visible=True)
class PoseEstimationApp:
def __init__(self, model_name: str = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8", use_remote: bool = True, max_history_turns: int = 50):
self.processing_steps = [
"Video upload completed",
"Starting video downsampling...",
"Executing Pose Estimation...",
"Running Stage1 prompt...",
"Running Stage2 prompt...",
"Running Evaluator...",
"Running Stage3 prompt...",
"Generating final result"
]
self.use_remote = use_remote
self.model_name = model_name
self.device = "cuda" if torch.cuda.is_available() else "cpu"
if not use_remote:
raise RuntimeError("Remote inference only supported, please set use_remote=True and provide HF_TOKEN environment variable.")
if InferenceClient is None:
raise RuntimeError("huggingface_hub not installed, please install to use remote inference.")
token = os.getenv("HF_TOKEN")
if not token:
raise RuntimeError("HF_TOKEN environment variable not set, please set the access token in deployment environment.")
try:
self.client = InferenceClient(model=model_name, token=token, provider="novita")
except Exception as e:
raise RuntimeError(f"Failed to initialize remote inference client: {e}")
# Conversation history management
# Use a list to store several rounds of conversation, each item is a dict containing 'role' ('user' or 'assistant') and 'content'
self.conversation_history: List[Dict[str, str]] = []
# Keep the most recent number of rounds (user+assistant), truncate when exceeded
self.max_history_turns = max_history_turns
def reset_history(self):
"""
Clear conversation history, call when starting a new multi-turn conversation scenario.
"""
self.conversation_history = []
def add_user_message(self, message: str):
self.conversation_history.append({"role": "user", "content": message})
# If exceeding maximum rounds (here a round refers to user+assistant), remove earliest rounds
# Calculate current entries, if len > 2 * max_history_turns, truncate the earliest two entries
max_items = 2 * self.max_history_turns
if len(self.conversation_history) > max_items:
# Discard the earliest two entries
self.conversation_history = self.conversation_history[-max_items:]
def add_assistant_message(self, message: str):
self.conversation_history.append({"role": "assistant", "content": message})
# Similarly truncate history
max_items = 2 * self.max_history_turns
if len(self.conversation_history) > max_items:
self.conversation_history = self.conversation_history[-max_items:]
def build_prompt_with_history(self, new_user_input: str) -> str:
"""
Concatenate history rounds with current user input into a prompt string.
Example:
User: ...
Assistant: ...
User: new_user_input
Assistant:
"""
prompt_parts = []
for turn in self.conversation_history:
if turn["role"] == "user":
prompt_parts.append(f"User: {turn['content']}")
else:
prompt_parts.append(f"Assistant: {turn['content']}")
# Add new user input, model reply will be generated at the end
prompt_parts.append(f"User: {new_user_input}")
prompt_parts.append("Assistant:") # Guide model generation
full_prompt = "\n".join(prompt_parts)
return full_prompt
def image_to_datauri(self, img: Image.Image, max_size=640, jpeg_quality=70):
# 先按最长边缩放到 max_size
w, h = img.size
scale = max_size / max(w, h)
if scale < 1.0:
new_w, new_h = int(w*scale), int(h*scale)
img = img.resize((new_w, new_h), Image.BILINEAR)
# 转 JPEG
buffered = io.BytesIO()
img.save(buffered, format="JPEG", quality=jpeg_quality)
img_b64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
return f"data:image/jpeg;base64,{img_b64}"
def query_llm_multimodal(self, text: str, indexed_images: list, use_history: bool = True, max_tokens: int = 1024):
messages = []
if use_history:
for turn in self.conversation_history:
messages.append({"role": turn['role'], "content": turn['content']})
# 先处理文字部分(如果有)
if text:
messages.append({"role": "user", "content": [{"type": "text", "text": text}]})
self.add_user_message(text)
# 逐帧添加图片输入
for clip_idx, (video_idx, img) in enumerate(indexed_images):
uri = self.image_to_datauri(img, max_size=512, jpeg_quality=60)
# 只对最关键的少量帧调用,或在调用前筛选
msg_content = [
{"type":"image_url","image_url": {"url": uri}},
{"type":"text","text":f"The {clip_idx}th image is the {video_idx+1}th frame in the video, please analyze and summarize the content. Use the original frame index ({video_idx+1}) for reminder."}
]
messages.append({"role":"user","content":msg_content})
self.add_user_message(f"[IMAGE frame {video_idx}]")
# 如帧过多,可在此处 break,或只处理前 K 帧
try:
response = self.client.chat.completions.create(messages=messages, max_tokens=max_tokens)
reply = response.choices[0].message.content
except Exception as e:
raise RuntimeError(f"Multimodal inference error: {e}")
self.add_assistant_message(reply)
return reply
def query_llm(self, prompt: str, max_length: int = 2048, use_history: bool = True) -> str:
if use_history:
messages = []
for turn in self.conversation_history:
messages.append({"role": turn['role'], "content": turn['content']})
messages.append({"role": "user", "content": prompt})
self.add_user_message(prompt)
else:
messages = [{"role": "user", "content": prompt}]
try:
response = self.client.chat.completions.create(messages=messages, max_tokens=max_length)
reply = response.choices[0].message.content
except Exception as e:
raise RuntimeError(f"Remote inference error: {e}")
self.add_assistant_message(reply)
return reply
# Other methods remain unchanged...
def downsample_video(self, input_path: str, output_path: str, downsample_rate: int) -> Tuple[str, int]:
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
raise RuntimeError(f"Cannot open video file {input_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
new_fps = max(1, int(fps / downsample_rate))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, new_fps, (width, height))
frame_count = 0
frames = []
while True:
ret, frame = cap.read()
if not ret:
break
if frame_count % downsample_rate == 0:
out.write(frame)
frames.append((frame_count, Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))))
frame_count += 1
cap.release()
out.release()
return output_path, new_fps, frames
@spaces.GPU()
def run_pose_estimation(self, tmp_dir, video_path: str) -> str:
base_name = os.path.splitext(os.path.basename(video_path))[0]
out_dir = os.path.join(tmp_dir, "output")
os.makedirs(out_dir, exist_ok=True)
# cmd = [
# "python", "/data/WHAM/demo.py",
# "--video", video_path,
# "--save_pkl",
# "--output_pth", out_dir
# ]
wham_execute(video_path, out_dir, True, True, False)
out_dir = os.path.join(out_dir, base_name)
# result = subprocess.run(cmd, capture_output=True, text=True)
# if result.returncode != 0:
# raise RuntimeError(f"Pose Estimation failed: {result.stderr}")
result_path = os.path.join(out_dir, "wham_output.pkl")
if not os.path.exists(result_path):
raise FileNotFoundError(f"Result file not found: {result_path}")
return result_path
def load_pose_data(self, pth_path: str):
try:
data = torch.load(pth_path, map_location="cpu")
return data
except Exception as e:
raise RuntimeError(f"Failed to load Pose data: {e}")
def extract_frames(self, video_path: str, frame_skip: int = 1) -> List[Tuple[int, Image.Image]]:
"""
Eagerly read and return all frames as a list of (frame_index, PIL.Image).
"""
frames = []
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return frames
idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
if idx % frame_skip == 0:
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
frames.append((idx, img))
idx += 1
cap.release()
return frames
def upload_initial_entry(self, video_path: str, instruction: str, downsample_rate: int, token: str):
"""
上传视频文件和 instruction 到 HF Dataset,并追加一条没有 rating 的记录。
"""
api = HfApi()
# 生成唯一 ID
entry_id = uuid.uuid4().hex
# 上传视频到 HF
filename = os.path.basename(video_path)
remote_video_path = f"videos/{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}_{entry_id}_{filename}"
# 加载或初始化 Dataset
try:
ds = load_dataset(HF_DATASET_ID, token=token)
if isinstance(ds, dict):
ds = ds["train"]
except:
ds = Dataset.from_dict({
"entry_id": [], "timestamp": [], "video_path": [],
"instruction": [], "downsample_rate": [], "rating": []
})
# 追加一条带 entry_id 的记录(rating 留空)
new = {
"entry_id": entry_id,
"timestamp": datetime.datetime.now().isoformat(),
"video_path": remote_video_path,
"instruction": instruction,
"downsample_rate": downsample_rate,
"rating": None
}
ds = ds.add_item(new)
ds.push_to_hub(HF_DATASET_ID, token=token)
api.upload_file(
path_or_fileobj=video_path,
path_in_repo=remote_video_path,
repo_id=HF_DATASET_ID,
token=token,
repo_type="dataset"
)
# 返回 entry_id 给前端
return entry_id
def process_video(self, video_file, downsample_rate, progress=gr.Progress()):
"""
修改:process_video 返回 (result_text, downsampled_video_path, downsample_rate) 三元组
以便界面显示视频并存储。
"""
if video_file is None:
return "Please upload a video file first", None, None
try:
self.reset_history()
progress(0.1, desc=self.processing_steps[0])
tmp_dir = tempfile.mkdtemp()
if hasattr(video_file, 'name'):
orig_ext = os.path.splitext(video_file.name)[1] # e.g. ".avi"
else:
orig_ext = os.path.splitext(video_file)[1]
input_path = os.path.join(tmp_dir, f"input{orig_ext}")
os.replace(video_file, input_path)
progress(0.2, desc=self.processing_steps[1])
downsampled_tmp = os.path.join(tmp_dir, "downsample.mp4")
downsampled_path, new_fps, frames = self.downsample_video(input_path, downsampled_tmp, downsample_rate)
frr = self.extract_frames(downsampled_path)
# results_dir = "results"
# os.makedirs(results_dir, exist_ok=True)
# unique_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + "_" + str(uuid.uuid4())[:8] + ".mp4"
# persistent_path = os.path.join(results_dir, unique_name)
# # 复制文件
# import shutil
# shutil.copyfile(downsampled_path, persistent_path)
# 不再复制到持久目录,直接使用 downsampled_path(在 tmp_dir 中)进行上传
persistent_path = input_path
progress(0.3, desc=self.processing_steps[2])
pth_path = self.run_pose_estimation(tmp_dir, input_path)
with open(pth_path, "rb") as f:
import joblib
pkl_file = joblib.load(f)
subjs = len(pkl_file.keys())
if subjs < 1:
return "Failed to detect characters from the video, please update a new video with higher frame rate and .", None, None
#pth_path = "wham_output.pth"
# Stage1
progress(0.4, desc=self.processing_steps[3])
stage1_path = os.path.join("prompts", "stage1.txt")
if not os.path.exists(stage1_path):
raise RuntimeError("Missing prompts/stage1.txt prompt file")
with open(stage1_path, 'r', encoding='utf-8') as f:
prompt1 = f.read()
prompt1_1 = prompt1.split("[IMAGEFLAG]")[0].strip()
prompt1_2 = prompt1.split("[IMAGEFLAG]")[1].strip()
out_stage1_1 = self.query_llm(prompt1_1, use_history=True)
out_images = self.query_llm_multimodal(text="", indexed_images=frames, use_history=True)
out_stage1_part2 = self.query_llm(prompt1_2, use_history=True)
# Stage2
progress(0.5, desc=self.processing_steps[4])
stage2_path = os.path.join("prompts", "stage2.txt")
if not os.path.exists(stage2_path):
raise RuntimeError("Missing prompts/stage2.txt prompt file")
with open(stage2_path, 'r', encoding='utf-8') as f:
prompt2 = f.read()
prompt2 = prompt2.replace("[FRAMERATE]", str(new_fps))
max_retries = 3
out_stage2 = ""
temp_json_path = os.path.join(tmp_dir, "temp_json.json")
for attempt in range(max_retries):
out_stage2 = self.query_llm(prompt2, use_history=True)
try:
parsed = json.loads(out_stage2)
with open(temp_json_path, 'w', encoding='utf-8') as f:
json.dump(parsed, f, ensure_ascii=False, indent=2)
break
except json.JSONDecodeError:
prompt2 = "The previous output was not valid JSON. Please output only valid JSON without any extra content." + "\n" + out_stage2
if attempt == max_retries - 1:
with open(temp_json_path, 'w', encoding='utf-8') as f:
f.write(out_stage2)
# Evaluator
progress(0.6, desc=self.processing_steps[5])
evaluator_cmd = ["python", "estimator.py", pth_path, temp_json_path]
result = subprocess.run(evaluator_cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Evaluator error: {result.stderr}")
output_txt_path = os.path.join(tmp_dir, "temp_json_output.txt")
with open(output_txt_path, 'r', encoding='utf-8') as f:
evaluator_output = f.read()
# Stage3
progress(0.7, desc=self.processing_steps[6])
stage3_path = os.path.join("prompts", "stage3.txt")
if not os.path.exists(stage3_path):
raise RuntimeError("Missing prompts/stage3.txt prompt file")
with open(stage3_path, 'r', encoding='utf-8') as f:
prompt3 = f.read()
prompt3 = prompt3.replace("[RESULTS]", evaluator_output)
out_stage3 = self.query_llm(prompt3, use_history=True)
stage4_path = os.path.join("prompts", "stage4.txt")
if not os.path.exists(stage4_path):
raise RuntimeError("Missing prompts/stage4.txt prompt file")
with open(stage4_path, 'r', encoding='utf-8') as f:
prompt4 = f.read()
prompt4 = prompt4.replace("[RESULTS]", evaluator_output)
out_stage4 = self.query_llm(prompt4, use_history=True)
hf_token = os.getenv("HF_TOKEN")
entry_id = self.upload_initial_entry(persistent_path, out_stage4, downsample_rate, hf_token)
progress(1.0, desc=self.processing_steps[7])
# 返回最终文本、持久化保存的视频路径、下采样率
return out_stage4, persistent_path, downsample_rate, entry_id
except Exception as e:
# 出错返回三个值,其中视频路径和下采样率为 None
return "Processing error: " + str(e), None, None, None
app = PoseEstimationApp()
def create_interface():
# 预定义两种语言下的文本
texts = {
"en": {
"title_md": "# 🎬 Video Pose Estimation Processing Platform",
"description_md": "Upload a video to downsample and perform pose estimation, combine multimodal LLM analysis to generate intelligent insights",
"input_settings": "## 📤 Input Settings",
"video_label": "Upload video file",
"downsample_label": "Temporal downsampling rate",
"downsample_info": "Take 1 frame every N frames and reduce frame rate. Higher rate runs faster, lower rate yields more accurate results.",
"process_btn": "🚀 Start Processing",
"clear_btn": "🔄 Clear",
"results_md": "## 📊 Processing Results",
"final_tab": "Final Result",
"final_label": "Final Comprehensive Result",
"rating_label": "Please rate the result (1–5):",
"submit_rating_btn": "Submit Rating",
"thankyou_msg": "Thank you for your feedback!",
"instructions_md": """
## 💡 Instructions
1. After uploading a video, the system will generate downsample.mp4 based on the downsampling rate.
2. Run WHAM/demo.py for Pose Estimation; results are saved in output/<video_name>/wham_output.pth.
3. The system will automatically read prompts/stage1.txt, stage2.txt, stage3.txt; user custom prompts are not accepted.
4. Stage1: prompts/stage1.txt can include [POSE_SUMMARY] placeholder, auto-replaced with pose summary.
5. Stage2: prompts/stage2.txt can include [FRAMERATE] and [STAGE1_RESULT] placeholders, auto-replaced.
6. Prompts will be forced to output JSON format for Evaluator use.
7. After Evaluator runs, it generates output.txt; content is automatically passed to Stage3.
8. Deployment requires HF_TOKEN environment variable set for HuggingFace access token; code uses it automatically.
9. Ensure project root contains prompts/stage1.txt, stage2.txt, stage3.txt and WHAM/demo.py, evaluator.py.
"""
},
"zh": {
"title_md": "# 🎬 视频姿态估计处理平台",
"description_md": "上传视频进行降采样和姿态估计,结合多模态 LLM 分析生成智能化见解",
"input_settings": "## 📤 输入设置",
"video_label": "上传视频文件",
"downsample_label": "时间降采样率",
"downsample_info": "每隔 N 帧取 1 帧并降低帧率。更高的采样率速度更快,但精度可能下降;更低采样率更准确。",
"process_btn": "🚀 开始处理",
"clear_btn": "🔄 清除",
"results_md": "## 📊 处理结果",
"final_tab": "最终结果",
"final_label": "最终综合结果",
"rating_label": "请对结果进行评分 (1–5):",
"submit_rating_btn": "提交评分",
"thankyou_msg": "感谢您的反馈!",
"instructions_md": """
## 💡 使用说明
1. 上传视频后,系统会根据降采样率生成 downsample.mp4。
2. 运行 WHAM/demo.py 进行姿态估计;结果保存在 output/<video_name>/wham_output.pth。
3. 系统会自动读取 prompts/stage1.txt、stage2.txt、stage3.txt;不接受用户自定义提示。
4. Stage1: prompts/stage1.txt 可包含 [POSE_SUMMARY] 占位符,将被自动替换。
5. Stage2: prompts/stage2.txt 可包含 [FRAMERATE] 和 [STAGE1_RESULT] 占位符,将被自动替换。
6. 提示将被强制输出 JSON 格式以供 Evaluator 使用。
7. Evaluator 运行后会生成 output.txt;内容会自动传递到 Stage3。
8. 部署需要设置 HF_TOKEN 环境变量以获得 HuggingFace 访问令牌;代码会自动使用。
9. 确保项目根目录下包含 prompts/stage1.txt、stage2.txt、stage3.txt 以及 WHAM/demo.py、evaluator.py。
"""
}
}
with gr.Blocks(
theme=gr.themes.Soft(),
title="Video Pose Estimation Processing Platform",
css="""
.gradio-container { max-width: 1200px !important; }
.tab-nav { background: linear-gradient(90deg, #667eea, #764ba2) !important; }
"""
) as demo:
# 语言状态
lang_state = gr.State("en")
# 隐藏状态:存储最近处理的视频路径和下采样率
last_video_path = gr.State(None)
last_downsample_rate = gr.State(None)
last_entry_id = gr.State(None)
# 语言切换按钮
lang_btn = gr.Button("中文") # 初始语言 en,所以按钮文字为“中文”
# 头部 Markdown
header_md = gr.Markdown(texts["en"]["title_md"])
desc_md = gr.Markdown(texts["en"]["description_md"])
with gr.Row():
with gr.Column(scale=1):
input_md = gr.Markdown(texts["en"]["input_settings"])
video_upload = gr.File(
label=texts["en"]["video_label"],
file_count="single",
type="filepath"
)
video_input = gr.Video(
label="preview",
height=300,
visible=False
)
video_upload.upload(
fn=ensure_compatible_video_and_show,
inputs=video_upload,
outputs=video_input
)
with gr.Row():
downsample_rate = gr.Slider(minimum=1, maximum=30, value=10, step=1,
label=texts["en"]["downsample_label"],
info=texts["en"]["downsample_info"])
with gr.Row():
process_btn = gr.Button(texts["en"]["process_btn"], variant="primary", size="lg")
clear_btn = gr.Button(texts["en"]["clear_btn"], variant="secondary")
with gr.Column(scale=2):
results_md = gr.Markdown(texts["en"]["results_md"])
with gr.Tabs() as tabs:
with gr.TabItem(texts["en"]["final_tab"]):
final_output = gr.Textbox(label=texts["en"]["final_label"], lines=12, max_lines=20)
# 新增:评分滑块和按钮
rating_slider = gr.Slider(minimum=1, maximum=5, step=1,
label=texts["en"]["rating_label"])
submit_rating_btn = gr.Button(value=texts["en"]["submit_rating_btn"])
# 用于显示提交后的感谢信息
thankyou_text = gr.Markdown("") # 初始为空
# 语言切换回调
def toggle_language(current_lang):
# current_lang: "en" 或 "zh",返回新的 current_lang 以及一系列组件更新
new_lang = "zh" if current_lang == "en" else "en"
t = texts[new_lang]
# 更新各个组件文本
updates = {
lang_state: new_lang,
header_md: gr.update(value=t["title_md"]),
desc_md: gr.update(value=t["description_md"]),
input_md: gr.update(value=t["input_settings"]),
video_input: gr.update(label=t["video_label"]),
downsample_rate: gr.update(label=t["downsample_label"], info=t["downsample_info"]),
process_btn: gr.update(value=t["process_btn"]),
clear_btn: gr.update(value=t["clear_btn"]),
results_md: gr.update(value=t["results_md"]),
final_output: gr.update(label=t["final_label"]),
rating_slider: gr.update(label=t["rating_label"]),
submit_rating_btn: gr.update(value=t["submit_rating_btn"]),
thankyou_text: gr.update(value="") # 切换语言时清空感谢信息
}
# 语言切换按钮文字也需更新:若当前是英文,则按钮显示“中文”,反之显示“English”
btn_text = "English" if new_lang == "zh" else "中文"
updates[lang_btn] = gr.update(value=btn_text)
return updates
lang_btn.click(fn=toggle_language,
inputs=[lang_state],
outputs=[lang_state,
header_md, desc_md,
input_md, video_input, downsample_rate, process_btn, clear_btn,
results_md, final_output, rating_slider, submit_rating_btn, thankyou_text,
lang_btn])
# 处理视频的回调:process_video 返回 (result_text, video_path, downsample_rate)
def on_process(video, rate):
result_text, video_path, dr, process_video = app.process_video(video, rate)
# 更新状态
# 如果成功,video_path 不为 None
return result_text, video_path, dr, process_video, gr.update(value=None), gr.update(value="")
# 注意:outputs 顺序对应 on_process 返回值
# outputs: final_output (文本), last_video_path (state), last_downsample_rate (state), last_entry_id (state), rating_slider (复位), thankyou_text (清空)
process_btn.click(fn=on_process,
inputs=[video_input, downsample_rate],
outputs=[final_output, last_video_path,
last_downsample_rate, last_entry_id,
rating_slider, thankyou_text])
# 清除按钮:重置所有
def on_clear():
return None, 10, None, None, gr.update(value=10), gr.update(value=""), "中文" if lang_state.value=="en" else "English"
# 返回顺序:video_input, downsample_rate, last_video_path, last_downsample_rate, rating_slider, thankyou_text, lang_btn
clear_btn.click(fn=on_clear,
outputs=[video_input, downsample_rate,
last_video_path, last_downsample_rate,
rating_slider, thankyou_text, lang_btn])
# 提交评分回调:读取 last_video_path, last_downsample_rate, rating_slider.value
def save_rating(rating, entry_id, current_lang):
hf_token = os.getenv("HF_TOKEN")
if not hf_token or entry_id is None:
return texts[current_lang]["thankyou_msg"]
# 加载 Dataset
ds = load_dataset(HF_DATASET_ID, token=hf_token)
ds = ds["train"] if isinstance(ds, dict) else ds
# 把对应 entry_id 的那行的 rating 更新
records = ds.to_list()
for rec in records:
if rec["entry_id"] == entry_id:
rec["rating"] = int(rating)
break
# 重新推到 Hub
new_ds = Dataset.from_list(records)
new_ds.push_to_hub(HF_DATASET_ID, token=hf_token)
return texts[current_lang]["thankyou_msg"]
# 绑定评分提交按钮
submit_rating_btn.click(fn=save_rating,
inputs=[rating_slider, last_entry_id, lang_state],
outputs=[thankyou_text])
# 底部说明
instructions_md = gr.Markdown(texts["en"]["instructions_md"])
# 当切换语言时,上面 toggle_language 已更新 instructions_md
return demo
if __name__ == "__main__":
demo = create_interface()
#port = int(os.environ.get("GRADIO_SERVER_PORT", os.environ.get("PORT", 7860)))
#server_name = str(os.environ.get("GRADIO_SERVER_NAME", "0.0.0.0"))
demo.launch(show_error=True, mcp_server=True) |