Spaces:
Runtime error
Runtime error
""" | |
Quantum Physics Problem Generator | |
Shlomo Kashani | |
Description: | |
------------ | |
This module is part of the QuantumLLMInstruct system, designed to generate and solve quantum physics problems | |
using advanced Large Language Models (LLMs). It utilizes a multi-stage pipeline for problem generation, | |
solution generation, and database management. | |
Core Functionalities: | |
--------------------- | |
1. **Problem Generation**: | |
- Generates quantum physics problems in LaTeX format using LLMs. | |
- Supports domain-specific problem generation across multiple quantum fields. | |
2. **Solution Generation**: | |
- Provides step-by-step LaTeX solutions for the generated problems using a second LLM. | |
3. **Data Management**: | |
- Stores generated problems and solutions in DuckDB and Parquet files. | |
- Enables exporting data in Parquet format for scalability and compatibility. | |
4. **Gradio Interface**: | |
- A user-friendly interface to interact with the system, including problem generation, | |
solution generation, and database exploration. | |
5. **Hugging Face Integration**: | |
- Supports visualization and interaction with the dataset on the Hugging Face platform. | |
Main Components: | |
---------------- | |
- **initialize_duckdb() / initialize_parquet()**: Initializes the database schema. | |
- **generate_multiple_problems()**: Generates multiple problems for the selected quantum domains. | |
- **generate_solutions()**: Solves unsolved problems in the database. | |
- **export_parquet()**: Exports the database to a Parquet file for external use. | |
Dependencies: | |
------------- | |
- Python 3.7+ | |
- Transformers: `transformers` | |
- DuckDB: `duckdb` | |
- Gradio: `gradio` | |
- Pandas: `pandas` | |
""" | |
import numpy as np | |
import random | |
import io | |
import duckdb | |
import math | |
from datetime import datetime | |
import PIL | |
from PIL import Image | |
import pennylane as qml | |
import base64 | |
import platform | |
from math import pi | |
import pandas as pd | |
import os | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
import tqdm | |
import duckdb | |
from tqdm import tqdm | |
import uuid | |
import random | |
import sympy | |
from datetime import datetime | |
from Q_llm_prompts import * | |
# Predefined Qwen models | |
# Qwen2.5 offers multiple model sizes, including 72B, 32B, 14B, 7B, 3B, 1.5B, 0.5B, etc. | |
# You can choose the appropriate model based on your needs and GPU memory size | |
model_options = [ | |
"Qwen/Qwen2.5-Coder-1.5B-Instruct", | |
"Qwen/Qwen2.5-Coder-3B-Instruct", | |
"Qwen/Qwen2.5-Coder-7B-Instruct", | |
"Qwen/Qwen2.5-Math-7B-Instruct", | |
"Qwen/Qwen2.5-Coder-32B-Instruct", | |
"meta-llama/Llama-3.2-3B-Instruct" | |
# "unsloth/Qwen2.5-Math-7B-Instruct", | |
# "unsloth/Llama-3.2-3B-Instruct-bnb-4bit", | |
# "nvidia/OpenMath-CodeLlama-7b-Python-hf" tokenizer.chat_template is not set and no template argument was passed! | |
] | |
solutions_model_options = model_options | |
# Load default model and tokenizer | |
selected_model = model_options[0] | |
model = AutoModelForCausalLM.from_pretrained( | |
selected_model, | |
torch_dtype="auto", | |
device_map="auto" | |
) | |
tokenizer = AutoTokenizer.from_pretrained(selected_model) | |
solution_model = selected_model | |
solution_tokenizer =tokenizer | |
solution_model_instance =model | |
# Function to reload the model when selection changes | |
def reload_model(model_name): | |
global model, tokenizer | |
model = AutoModelForCausalLM.from_pretrained( | |
model_name, | |
torch_dtype="auto", | |
device_map="auto" | |
) | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
return f"Model loaded: {model_name}" | |
# Define a Pennylane device | |
dev = qml.device('default.qubit', wires=10) | |
# Detect platform-specific device | |
def is_mac_os(): | |
return platform.system() == 'Darwin' | |
device = 'cpu' if is_mac_os() else 'cuda' | |
RESPONSE_SOLUTION_LLM_SYS_PROMPT = "You are an expert in quantum physics and provide detailed solutions in plain text. All mathematical equations and symbols must strictly be in LaTeX." | |
RESPONSE_SOLUTION_LLM_USR_PROMPT = """ | |
Provide a complete solution to the following quantum physics problem in plain text format: | |
{problem} | |
""" | |
# Parquet file setup | |
PARQUET_FILE = 'quantum_problems.parquet' | |
def initialize_parquet(): | |
"""Initialize Parquet file with the required schema if it doesn't exist.""" | |
if not os.path.exists(PARQUET_FILE): | |
data = { | |
"uuid": [], | |
"timestamp": [], | |
"problem": [], | |
"sub_domain": [], | |
"main_domain": [], | |
"model_name": [], | |
"solution": [], | |
"solution_model_name": [] | |
} | |
df = pd.DataFrame(data) | |
df.to_parquet(PARQUET_FILE, index=False) | |
print("Initialized Parquet file with schema.") | |
def load_parquet(): | |
"""Load data from the Parquet file.""" | |
if os.path.exists(PARQUET_FILE): | |
return pd.read_parquet(PARQUET_FILE) | |
else: | |
initialize_parquet() | |
return pd.read_parquet(PARQUET_FILE) | |
def save_parquet(df): | |
"""Save DataFrame to Parquet file.""" | |
df.to_parquet(PARQUET_FILE, index=False) | |
def insert_problem_pqt(uuid, timestamp, problem, main_domain, sub_domain, model_name, solution=None, solution_model_name=None): | |
"""Insert a new problem into the Parquet file.""" | |
df = load_parquet() | |
new_row = { | |
"uuid": uuid, | |
"timestamp": timestamp, | |
"problem": problem, | |
"sub_domain": sub_domain, | |
"main_domain": main_domain, | |
"model_name": model_name, | |
"solution": solution, | |
"solution_model_name": solution_model_name | |
} | |
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) | |
save_parquet(df) | |
def update_solution_pqt(uuid, solution, solution_model_name): | |
"""Update the solution for a given problem UUID.""" | |
df = load_parquet() | |
df.loc[df["uuid"] == uuid, ["solution", "solution_model_name"]] = solution, solution_model_name | |
save_parquet(df) | |
# DuckDB setup | |
DB_FILE = 'quantum_problems.duckdb' | |
def initialize_duckdb(): | |
conn = duckdb.connect(database=DB_FILE) | |
conn.execute(""" | |
CREATE TABLE IF NOT EXISTS problems ( | |
uuid TEXT UNIQUE NOT NULL, | |
timestamp TEXT, | |
problem TEXT, | |
sub_domain TEXT, | |
main_domain TEXT, | |
model_name TEXT, | |
solution TEXT, | |
solution_model_name TEXT | |
) | |
""") | |
# print ("Created schema") | |
# df = conn.execute("SELECT * FROM problems").df() | |
# print (df.count) | |
conn.close() | |
# Function to buffer the plot and return as PIL image | |
def buffer_plot_and_get(fig): | |
buf = io.BytesIO() | |
fig.savefig(buf, format='png') | |
buf.seek(0) | |
return PIL.Image.open(buf) | |
# Store image in bytes for DuckDB | |
def pil_image_to_bytes(image): | |
img_byte_arr = io.BytesIO() | |
image.save(img_byte_arr, format='PNG') | |
return img_byte_arr.getvalue() | |
# Encode the image in base64 to display in HTML | |
def encode_image_from_blob(blob): | |
img_buffer = io.BytesIO(blob) | |
image = Image.open(img_buffer) | |
img_str = base64.b64encode(img_buffer.getvalue()).decode("utf-8") | |
return f'<img src="data:image/png;base64,{img_str}" style="max-width:500px;"/>' | |
# Function to generate a random Hamiltonian | |
def generate_random_hamiltonian(num_qubits): | |
terms = [] | |
for _ in range(random.randint(1, 5)): | |
coeff = round(random.uniform(-1, 1), 2) | |
pauli_ops = [random.choice(['I', 'X', 'Y', 'Z']) for _ in range(num_qubits)] | |
term = f"{coeff} * {' '.join(pauli_ops)}" | |
terms.append(term) | |
return " + ".join(terms) | |
# Function to convert Hamiltonian to QASM code | |
def hamiltonian_to_qasm(hamiltonian, num_qubits): | |
qasm_code = f"OPENQASM 2.0;\ninclude \"qelib1.inc\";\nqreg q[{num_qubits}];\n" | |
rotations = {i: 0.0 for i in range(num_qubits)} | |
terms = hamiltonian.split(" + ") | |
for term in terms: | |
coeff, paulis = term.split(" * ") | |
paulis = paulis.split() | |
coeff = float(coeff) | |
for i, pauli in enumerate(paulis): | |
if pauli == "X": | |
qasm_code += f"x q[{i}];\n" | |
elif pauli == "Y": | |
qasm_code += f"ry(pi/2) q[{i}];\n" | |
elif pauli == "Z": | |
rotations[i] += coeff | |
for i, angle in rotations.items(): | |
if angle != 0: | |
angle_degrees = round(angle * 180 / math.pi, 2) | |
qasm_code += f"rz({angle_degrees}) q[{i}];\n" | |
return qasm_code | |
# Function to parse QASM code and create Pennylane circuit | |
def qasm_to_pennylane(qasm_code): | |
qasm_lines = qasm_code.split("\n") | |
num_qubits = int(qasm_lines[2].split('[')[1].split(']')[0]) # Extract number of qubits from QASM | |
def circuit(): | |
for line in qasm_lines: | |
if "x" in line: | |
qml.PauliX(int(line.split('q[')[1].split(']')[0])) | |
elif "rz" in line: | |
angle = float(line.split('(')[1].split(')')[0]) | |
qml.RZ(angle, int(line.split('q[')[1].split(']')[0])) | |
elif "ry" in line: | |
qml.RY(pi / 2, int(line.split('q[')[1].split(']')[0])) | |
return qml.state() | |
return circuit | |
# # Store data in DuckDB | |
# def store_in_duckdb(data, db_file='quantum_hamiltonians.duckdb'): | |
# conn = duckdb.connect(database=db_file) | |
# conn.execute("""CREATE TABLE IF NOT EXISTS hamiltonians ( | |
# id INTEGER, | |
# plot BLOB, | |
# hamiltonian VARCHAR, | |
# qasm_code VARCHAR, | |
# trotter_code VARCHAR, | |
# num_qubits INTEGER, | |
# trotter_order INTEGER, | |
# timestamp TIMESTAMP | |
# )""") | |
# conn.executemany("""INSERT INTO hamiltonians (id, plot, hamiltonian, qasm_code, trotter_code, num_qubits, trotter_order, timestamp) | |
# VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", data) | |
# conn.close() | |
# Function to load results from DuckDB | |
def load_from_duckdb(db_file='quantum_hamiltonians.duckdb'): | |
conn = duckdb.connect(database=db_file) | |
df = conn.execute("SELECT * FROM hamiltonians").df() | |
conn.close() | |
# Convert results to HTML with images | |
html_content = [] | |
for index, row in df.iterrows(): | |
plot_blob = row['plot'] | |
encoded_img = encode_image_from_blob(plot_blob) | |
html_content.append(f""" | |
<table style='width: 100%; border-collapse: collapse; margin: 10px;'> | |
<tr> | |
<td style='width: 30%; text-align: center;'> | |
<h3>Circuit {index + 1}</h3> | |
{encoded_img} <!-- Display the image --> | |
</td> | |
<td style='padding: 10px;'> | |
<table style='width: 100%; border-collapse: collapse;'> | |
<tr> | |
<td><strong>Hamiltonian:</strong></td><td>{row['hamiltonian']}</td> | |
</tr> | |
<tr> | |
<td><strong>QASM Representation:</strong></td><td>{row['qasm_code']}</td> | |
</tr> | |
<tr> | |
<td><strong>Trotter Decomposition:</strong></td><td>{row['trotter_code']}</td> | |
</tr> | |
<tr> | |
<td><strong>Number of Qubits:</strong></td><td>{row['num_qubits']}</td> | |
</tr> | |
<tr> | |
<td><strong>Trotter Order:</strong></td><td>{row['trotter_order']}</td> | |
</tr> | |
<tr> | |
<td><strong>Timestamp:</strong></td><td>{row['timestamp']}</td> | |
</tr> | |
</table> | |
</td> | |
</tr> | |
</table> | |
""") | |
return "".join(html_content) | |
# Function to generate Hamiltonians | |
def generate_hamiltonians(num_hamiltonians, selected_qubits, selected_order): | |
results_table = [] | |
timestamp = str(datetime.now()) | |
for i in range(num_hamiltonians): | |
num_qubits = random.choice(selected_qubits) | |
order = selected_order | |
hamiltonian = generate_random_hamiltonian(num_qubits) | |
qasm_code = hamiltonian_to_qasm(hamiltonian, num_qubits) | |
trotter_code = trotter_decomposition(hamiltonian, order) | |
# Generate Pennylane circuit from QASM code | |
circuit = qasm_to_pennylane(qasm_code) | |
# Draw the Pennylane circuit and save as an image | |
fig, ax = qml.draw_mpl(circuit)() | |
circuit_plot_image = buffer_plot_and_get(fig) | |
circuit_plot_bytes = pil_image_to_bytes(circuit_plot_image) | |
# Append data to results table | |
results_table.append((i + 1, circuit_plot_bytes, hamiltonian, qasm_code, trotter_code, num_qubits, order, timestamp)) | |
# Function for Trotter decomposition | |
def trotter_decomposition(hamiltonian, order): | |
terms = hamiltonian.split(" + ") | |
trotter_steps = [] | |
for term in terms: | |
coeff, *pauli_ops = term.split(" * ") | |
coeff = float(coeff) | |
for _ in range(order): | |
trotter_steps.append(f"exp({coeff / order}) * ({' * '.join(pauli_ops)})") | |
for _ in range(order): | |
trotter_steps.append(f"exp({-coeff / order}) * ({' * '.join(pauli_ops)})") | |
return " + ".join(trotter_steps) | |
# def export_parquet(db_file): | |
# """Export DuckDB table to a Parquet file using COPY.""" | |
# try: | |
# conn = duckdb.connect(database=db_file) | |
# parquet_file = f"quantum_problems_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet" | |
# conn.execute(f"COPY problems TO '{parquet_file}' (FORMAT PARQUET);") | |
# conn.close() | |
# return f"Data successfully exported to Parquet file: {parquet_file}" | |
# except Exception as e: | |
# return f"Error exporting to Parquet: {e}" | |
def export_parquet(db_file): | |
"""Export DuckDB table to a Parquet file using COPY.""" | |
try: | |
conn = duckdb.connect(database=db_file) | |
parquet_file = f"quantum_problems_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet" | |
conn.execute(f""" | |
COPY ( | |
SELECT | |
uuid, | |
CAST(timestamp AS VARCHAR) AS timestamp, | |
problem, | |
sub_domain, | |
main_domain, | |
model_name, | |
solution, | |
solution_model_name | |
FROM problems | |
) TO '{parquet_file}' (FORMAT PARQUET); | |
""") | |
conn.close() | |
df = pd.read_parquet(parquet_file) | |
df['timestamp'] = df['timestamp'].astype(str) | |
df.to_parquet(parquet_file, index=False) | |
return f"Data successfully exported to Parquet file: {parquet_file}" | |
except Exception as e: | |
return f"Error exporting to Parquet: {e}" | |
def generate_dynamic_prompt(selected_domains): | |
if not selected_domains: | |
raise ValueError("No domains selected. Please select at least one domain.") | |
# Select a single domain randomly | |
selected_domain = random.choice(selected_domains) | |
print("Selected Domain:", selected_domain) | |
# Retrieve the description and template | |
domain_details = quantum_problem_domains[selected_domain] | |
domain_description = domain_details["description"] | |
example_output = domain_details["template"] | |
RESPONSE_INSTRUCTION_LLM_PROMPT = f""" | |
Generate a single detailed quantum physics problem for an exam in LaTeX format. Do not solve the problem. | |
Do not include additional explanations or comments outside of LaTeX, and avoid unnecessary LaTeX imports (e.g., \\documentclass{{}}, \\usepackage{{}}, or \\begin{{document}}). | |
All mathematical equations and symbols must strictly be in LaTeX. | |
Your response must strictly follow this provided format: | |
1) {{Problem:}} Clearly define the quantum physics problem here, using mathematical precision and LaTeX formatting. Provide any equations or detailed descriptions necessary for students to understand and solve the problem. | |
2) {{Domain:}} Provide a concise two-word domain description in CAPS such as "ISING HAMILTONIAN". | |
Do not solve the problem!. The problem must strictly adhere to one and only one of the following domain types: | |
{domain_description} | |
Example Response Output: | |
{example_output} | |
""" | |
return RESPONSE_INSTRUCTION_LLM_PROMPT, selected_domain | |
# Function to generate a quantum physics problem | |
def generate_problem(pair_id, model_name, selected_domains): | |
try: | |
prompt, selected_domain = generate_dynamic_prompt(selected_domains) | |
messages = [ | |
{"role": "system", "content": "You are a quantum physics professor and an expert in quantum computing."}, | |
{"role": "user", "content": prompt} | |
] | |
text = tokenizer.apply_chat_template( | |
messages, | |
tokenize=False, | |
add_generation_prompt=True | |
) | |
model_inputs = tokenizer([text], return_tensors="pt").to(model.device) | |
generated_ids = model.generate( | |
**model_inputs, | |
max_new_tokens=10024 | |
) | |
generated_ids = [ | |
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) | |
] | |
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
if "{Problem:}" not in response or "{Domain:}" not in response: | |
raise ValueError(f"Generated problem does not match the expected format. Response:\n{response}") | |
problem = response.split("{Problem:}")[1].split("{Domain:}")[0].strip() | |
sub_domain = response.split("{Domain:}")[1].strip() | |
# Insert the problem into DuckDB | |
conn = duckdb.connect(database=DB_FILE) | |
conn.execute(""" | |
INSERT INTO problems (uuid, timestamp, problem, main_domain, sub_domain, model_name) | |
VALUES (?, ?, ?, ?, ?, ?) | |
""", (str(uuid.uuid4()), datetime.now().isoformat(), problem, selected_domain, sub_domain, model_name.split("/")[-1])) | |
conn.close() | |
# print(response) | |
return response | |
except Exception as e: | |
print(f"Error generating problem {pair_id}: {e}") | |
return None | |
# Generate multiple problems with progress | |
def generate_multiple_problems(num_pairs, selected_domains): | |
if not selected_domains: | |
return "Please select at least one domain type." | |
conn = duckdb.connect(database=DB_FILE) | |
current_count = conn.execute("SELECT COUNT(*) FROM problems").fetchone()[0] | |
conn.close() | |
responses = [] | |
with tqdm(total=num_pairs, desc="Generating Problems", unit="problem") as pbar: | |
for i in range(num_pairs): | |
response = generate_problem(current_count + i + 1, selected_model, selected_domains) | |
if response: | |
responses.append(response) | |
pbar.update(1) | |
print ("Generated PRB:{}",i) | |
return "\n\n".join(responses) | |
def generate_solutions_pqt(solution_model_name): | |
df = load_parquet() | |
unsolved_problems = df[df["solution"].isna()] | |
if unsolved_problems.empty: | |
return "No unsolved problems found in the database." | |
with tqdm(total=len(unsolved_problems), desc="Generating Solutions", unit="solution") as pbar: | |
for _, row in unsolved_problems.iterrows(): | |
try: | |
solution_prompt = RESPONSE_SOLUTION_LLM_USR_PROMPT.format(problem=row["problem"]) | |
messages = [ | |
{"role": "system", "content": RESPONSE_SOLUTION_LLM_SYS_PROMPT}, | |
{"role": "user", "content": solution_prompt} | |
] | |
text = solution_tokenizer.apply_chat_template( | |
messages, | |
tokenize=False, | |
add_generation_prompt=True | |
) | |
model_inputs = solution_tokenizer([text], return_tensors="pt").to(solution_model_instance.device) | |
generated_ids = solution_model_instance.generate( | |
**model_inputs, | |
max_new_tokens=10024 | |
) | |
generated_ids = [ | |
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) | |
] | |
solution = solution_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
# Update solution in Parquet | |
update_solution_pqt(row["uuid"], solution, solution_model_name.split("/")[-1]) | |
except Exception as e: | |
print(f"Error generating solution for problem {row['uuid']}: {e}") | |
pbar.update(1) | |
return "Solutions generated successfully!" | |
# Function to generate solutions for unsolved problems | |
def generate_solutions(solution_model_name): | |
conn = duckdb.connect(database=DB_FILE) | |
problems = conn.execute("SELECT uuid, problem FROM problems WHERE solution IS NULL").fetchall() | |
if not problems: | |
return "No unsolved problems found in the database." | |
with tqdm(total=len(problems), desc="Generating Solutions", unit="solution") as pbar: | |
for problem_id, problem_text in problems: | |
try: | |
solution_prompt = RESPONSE_SOLUTION_LLM_USR_PROMPT.format(problem=problem_text) | |
messages = [ | |
{"role": "system", "content": RESPONSE_SOLUTION_LLM_SYS_PROMPT}, | |
{"role": "user", "content": solution_prompt} | |
] | |
text = solution_tokenizer.apply_chat_template( | |
messages, | |
tokenize=False, | |
add_generation_prompt=True | |
) | |
model_inputs = solution_tokenizer([text], return_tensors="pt").to(solution_model_instance.device) | |
generated_ids = solution_model_instance.generate( | |
**model_inputs, | |
max_new_tokens=10024 | |
) | |
generated_ids = [ | |
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) | |
] | |
solution = solution_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
conn.execute(""" | |
UPDATE problems | |
SET solution = ?, solution_model_name = ? | |
WHERE uuid = ? | |
""", (solution, solution_model_name.split("/")[-1], problem_id)) | |
except Exception as e: | |
print(f"Error generating solution for problem {problem_id}: {e}") | |
pbar.update(1) | |
conn.close() | |
return "Solutions generated successfully!" | |
# Load problems from DuckDB | |
def load_problems_from_duckdb(): | |
"""Load all problems and solutions from the DuckDB database.""" | |
conn = duckdb.connect(database=DB_FILE) | |
df = conn.execute("SELECT * FROM problems").df() | |
conn.close() | |
return df | |
# Load summary from DuckDB | |
def load_summary_from_duckdb(): | |
conn = duckdb.connect(database=DB_FILE) | |
# Total number of problems | |
total_problems = conn.execute("SELECT COUNT(*) FROM problems").fetchone()[0] | |
# Count of distinct domains | |
distinct_domains_count = conn.execute("SELECT COUNT(DISTINCT main_domain) FROM problems").fetchone()[0] | |
# Problems by model | |
problems_by_model = conn.execute("SELECT model_name, COUNT(*) as count FROM problems GROUP BY model_name").fetchall() | |
conn.close() | |
# Build the summary | |
summary = f"<h3>Total Problems: {total_problems}</h3>" | |
summary += f"<h4>Distinct Domains: {distinct_domains_count}</h4>" | |
summary += "<h4>Problems by Model:</h4><ul>" | |
for model_name, count in problems_by_model: | |
summary += f"<li>{model_name}: {count}</li>" | |
summary += "</ul>" | |
return summary | |