Spaces:
Running
Running
import sys | |
import os | |
import re | |
import json | |
import base64 | |
from io import BytesIO | |
from PIL import Image | |
import argparse | |
from vis_python_exe import PythonExecutor | |
from openai import OpenAI | |
from typing import Optional, Union | |
import gradio as gr | |
import markdown | |
def encode_image(image): | |
""" | |
将PIL.Image对象或图像文件路径转换为base64编码字符串 | |
参数: | |
image: 可以是PIL.Image对象或图像文件路径 | |
返回: | |
base64编码的字符串 | |
""" | |
if isinstance(image, str): | |
# 处理文件路径的情况 | |
with open(image, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode('utf-8') | |
else: | |
# 处理PIL.Image对象的情况 | |
buffered = BytesIO() | |
image.save(buffered, format=image.format if hasattr(image, 'format') else 'PNG') | |
return base64.b64encode(buffered.getvalue()).decode('utf-8') | |
def excute_codes(codes, messages, executor: PythonExecutor): | |
no_code_idx = [] | |
codes_use = [] | |
for i, code in enumerate(codes): | |
if code == "": | |
no_code_idx.append(i) | |
else: | |
codes_use.append(code) | |
batch_results = executor.batch_apply(codes_use, messages) | |
return batch_results, no_code_idx | |
def process_prompt_init(question, image, prompt_template, prompt_type): | |
prompt_prefix = prompt_template[prompt_type] | |
image_base64 = encode_image(image) | |
question_with_options = question | |
messages = [ | |
{ | |
"role": "user", | |
"content": [{"type": "text", "text": "<image_clue_0>"}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}}] + [{"type": "text", "text": "</image_clue_0>\n\n"}] + [{"type": "text", "text": prompt_prefix.format(query=question_with_options)}] | |
} | |
] | |
return messages | |
def update_messages_with_excu_content(messages, images_result, text_result, image_clue_idx): | |
new_messages = [] | |
image_content = [] | |
for message_item in messages[:-1]: | |
new_messages.append(message_item) | |
assistant_message_item = messages[-1]['content'] | |
interpreter_message_text_prefix = [{"type": "text", "text": f"<interpreter>\nText Result:\n{text_result}\nImage Result:\n"}] | |
if images_result is not None: | |
for image_base64_item in images_result: | |
interpreter_message_images = [{"type": "text", "text": f"<image_clue_{image_clue_idx}>"}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64_item}"}}] + [{"type": "text", "text": f"</image_clue_{image_clue_idx}>"}] | |
image_content += interpreter_message_images | |
image_clue_idx += 1 | |
else: | |
image_content = [{"type": "text", "text": "None"}] | |
interpreter_message_text_profill = [{"type": "text", "text": "</interpreter>\n"}] | |
assistant_message_item = assistant_message_item + interpreter_message_text_prefix + image_content + interpreter_message_text_profill | |
new_messages.append({"role": "assistant", "content": assistant_message_item}) | |
return new_messages, image_clue_idx | |
def update_messages_with_code(messages, generated_content): | |
message_item = { | |
"role": "assistant", | |
"content": [{"type": "text", "text": f"{generated_content}</code>\n"}] | |
} | |
messages.append(message_item) | |
return messages | |
def update_messages_with_text(messages, generated_content): | |
message_item = { | |
"role": "assistant", | |
"content": [{"type": "text", "text": f"{generated_content}"}] | |
} | |
messages.append(message_item) | |
return messages | |
def call_chatgpt_api(messages, client, max_tokens=10000, stop=None, temperature=1.1): | |
"""Call ChatGPT API with the given messages""" | |
try: | |
response = client.chat.completions.create( | |
model="gpt-4.1", # 使用支持视觉的模型 | |
messages=messages, | |
max_tokens=max_tokens, | |
temperature=temperature, | |
top_p=1.0, | |
stop=stop | |
) | |
response_text = response.choices[0].message.content | |
# 检查是否遇到停止标记 | |
stop_reason = None | |
if stop and any(s in response_text for s in stop): | |
for s in stop: | |
if s in response_text: | |
stop_reason = s | |
break | |
else: | |
stop_reason = response.choices[0].finish_reason | |
if "<code>" in response_text: | |
stop_reason = "</code>" | |
return response_text, stop_reason | |
except Exception as e: | |
print(f"API Error: {str(e)}") | |
return None, None | |
def evaluate_single_data(data, client, executor, prompt_template, prompt_type): | |
messages = process_prompt_init(data["question"], data['image'], prompt_template, prompt_type) | |
# 生成初始响应 | |
response_text, pred_stop_reason = call_chatgpt_api( | |
messages, | |
client, | |
max_tokens=10000, | |
stop=["</code>"] | |
) | |
if response_text is None: | |
print("Failed to get response from API") | |
return { | |
"input": data["question"], | |
"output": data["answer"], | |
"prediction": { | |
"solution": "API Error", | |
"correctness": False, | |
"code_execution_count": 0, | |
} | |
} | |
# 处理响应 | |
final_response = response_text | |
code_execution_count = 0 | |
image_clue_idx = 1 | |
while True: | |
# 检查是否需要执行代码 | |
if pred_stop_reason == "</code>": | |
# 提取要执行的代码 | |
messages = update_messages_with_code(messages, response_text) | |
code_to_execute = response_text.split("```python")[-1].split("```")[0].strip() | |
# 执行代码 | |
exe_result = excute_codes([code_to_execute], messages, executor)[0][0] | |
if exe_result is None: | |
text_result = "None" | |
images_result = None | |
else: | |
output, report = exe_result | |
try: | |
text_result = exe_result[0]['text'] | |
except: | |
text_result = None | |
try: | |
images_result = exe_result[0]['images'] | |
except: | |
images_result = None | |
messages, new_image_clue_idx = update_messages_with_excu_content(messages, images_result, text_result, image_clue_idx) | |
image_clue_idx = new_image_clue_idx | |
code_execution_count += 1 | |
# 生成下一部分响应 | |
response_text, pred_stop_reason = call_chatgpt_api( | |
messages, | |
client, | |
max_tokens=10000, | |
stop=["</code>"] | |
) | |
else: | |
final_response = response_text | |
messages = update_messages_with_text(messages, response_text) | |
break | |
return messages | |
def process_message(messages): | |
# 创建HTML输出 | |
html_output = '<div style="color: black;">' # 添加一个包裹所有内容的div,设置文本颜色为黑色 | |
for message_item in messages: | |
role = message_item['role'] | |
content = message_item['content'] | |
# 根据角色设置样式 | |
if role == "user" or role == "human": | |
html_output += f'<div style="background-color: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>User:</strong><br>' | |
elif role == "assistant": | |
html_output += f'<div style="background-color: #e6f7ff; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>Assistant:</strong><br>' | |
else: | |
html_output += f'<div style="background-color: #f9f9f9; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>{role.capitalize()}:</strong><br>' | |
# 处理内容 | |
for content_item in content: | |
content_type = content_item['type'] | |
if content_type == "text": | |
# 将Markdown文本转换为HTML | |
md_text = content_item['text'] | |
html_text = markdown.markdown(md_text, extensions=['fenced_code', 'codehilite']) | |
# html_text = markdown.markdown(md_text) | |
# html_text = md_text | |
html_output += f'<div style="color: black;">{html_text}</div>' | |
elif content_type == "image_url": | |
content_value = content_item['image_url']['url'] | |
# 如果是base64图片 | |
if content_value.startswith("data:"): | |
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">' | |
else: | |
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">' | |
html_output += '</div>' | |
html_output += '</div>' # 关闭最外层div | |
return html_output | |
def o3_chat(api_key, base_url, question, image): | |
print("done!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") | |
# 初始化组件 | |
client = OpenAI(api_key=api_key, base_url=base_url) | |
executor = PythonExecutor() | |
prompt_template = json.load(open("./prompt_template_vis.json", "r", encoding="utf-8")) | |
prompt_type = 'vistool' | |
data = { | |
"question": question, | |
"image": image, | |
} | |
# 评估单个数据点 | |
messages = evaluate_single_data(data, client, executor, prompt_template, prompt_type) | |
html_output = process_message(messages) | |
return html_output | |
# Gradio界面 | |
def create_demo(): | |
with gr.Blocks(css="footer {visibility: hidden}") as demo: | |
gr.Markdown("# O3 Visual Python Interpreter") | |
gr.Markdown("Upload an image and ask a question to get a response with code execution capabilities.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
api_key = gr.Textbox(label="OpenAI API Key", type="password", value="sk-ugYYFZhjv7FIfjqRWpDYU3rqZTK2YnPPCVXoErmIAk3YJSrt") | |
base_url = gr.Textbox(label="Base URL (optional)", value="https://api.claudeshop.top/v1") | |
image_input = gr.Image(type="pil", label="Upload Image") | |
question = gr.Textbox(label="Question", placeholder="Ask a question about the image...") | |
submit_btn = gr.Button("Submit") | |
with gr.Column(scale=2): | |
output = gr.HTML(label="Response") | |
submit_btn.click( | |
fn=o3_chat, | |
inputs=[api_key, base_url, question, image_input], | |
outputs=[output] | |
) | |
gr.Markdown(""" | |
## Examples | |
Try asking questions like: | |
- "What's in this image?" | |
- "Can you analyze the data in this chart?" | |
- "Generate a similar visualization with Python" | |
""") | |
return demo | |
# 创建并启动应用 | |
if __name__ == "__main__": | |
demo = create_demo() | |
demo.launch() | |