|
""" |
|
schema_tester.py - Official Schema Testing System |
|
|
|
This script iterates over all schemas in schemas/, prompts the trained model, |
|
validates output with jsonschema, and prints comprehensive pass/fail results. |
|
|
|
Matches the exact specification from the user's requirements. |
|
""" |
|
|
|
import os |
|
import json |
|
import torch |
|
from pathlib import Path |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
from peft import PeftModel |
|
import jsonschema |
|
from jsonschema import validate, ValidationError |
|
import random |
|
|
|
class SchemaValidator: |
|
"""Handles JSON schema validation.""" |
|
|
|
@staticmethod |
|
def validate_function_call(response, schema): |
|
"""Validate if response matches expected function call structure.""" |
|
try: |
|
|
|
call_data = json.loads(response) |
|
|
|
|
|
if not isinstance(call_data, dict): |
|
return False, "Response is not a JSON object" |
|
|
|
if "name" not in call_data: |
|
return False, "Missing 'name' field" |
|
|
|
if "arguments" not in call_data: |
|
return False, "Missing 'arguments' field" |
|
|
|
|
|
if call_data["name"] != schema["name"]: |
|
return False, f"Function name mismatch: expected '{schema['name']}', got '{call_data['name']}'" |
|
|
|
|
|
try: |
|
validate(instance=call_data["arguments"], schema=schema["parameters"]) |
|
return True, "Valid function call" |
|
except ValidationError as e: |
|
return False, f"Argument validation failed: {e.message}" |
|
|
|
except json.JSONDecodeError as e: |
|
return False, f"Invalid JSON: {e}" |
|
|
|
class ModelTester: |
|
"""Handles model loading and testing.""" |
|
|
|
def __init__(self, model_path="./smollm3_robust"): |
|
self.model_path = model_path |
|
self.model = None |
|
self.tokenizer = None |
|
self.device = None |
|
self._load_model() |
|
|
|
def _load_model(self): |
|
"""Load the trained model.""" |
|
print("π Loading trained SmolLM3-3B model...") |
|
|
|
base_model_name = "HuggingFaceTB/SmolLM3-3B" |
|
|
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(base_model_name) |
|
if self.tokenizer.pad_token is None: |
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
|
|
|
base_model = AutoModelForCausalLM.from_pretrained( |
|
base_model_name, |
|
torch_dtype=torch.float32, |
|
trust_remote_code=True |
|
) |
|
|
|
|
|
self.model = PeftModel.from_pretrained(base_model, self.model_path) |
|
|
|
|
|
if torch.backends.mps.is_available(): |
|
self.model = self.model.to("mps") |
|
self.device = "mps" |
|
else: |
|
self.device = "cpu" |
|
|
|
print(f"β
Model loaded on {self.device}") |
|
|
|
def test_schema(self, schema, question): |
|
"""Test the model on a specific schema and question.""" |
|
|
|
prompt = f"""<|im_start|>system |
|
You are a helpful assistant that calls functions by responding with valid JSON when given a schema. Always respond with JSON function calls only, never prose.<|im_end|> |
|
|
|
<schema> |
|
{json.dumps(schema, indent=2)} |
|
</schema> |
|
|
|
<|im_start|>user |
|
{question}<|im_end|> |
|
<|im_start|>assistant |
|
""" |
|
|
|
|
|
inputs = self.tokenizer(prompt, return_tensors="pt") |
|
if self.device == "mps": |
|
inputs = {k: v.to(self.device) for k, v in inputs.items()} |
|
|
|
|
|
self.model.eval() |
|
with torch.no_grad(): |
|
outputs = self.model.generate( |
|
**inputs, |
|
max_new_tokens=150, |
|
temperature=0.1, |
|
do_sample=True, |
|
pad_token_id=self.tokenizer.eos_token_id, |
|
eos_token_id=self.tokenizer.eos_token_id |
|
) |
|
|
|
|
|
input_length = inputs["input_ids"].shape[1] |
|
response = self.tokenizer.decode(outputs[0][input_length:], skip_special_tokens=True) |
|
|
|
|
|
response = response.strip() |
|
if response.endswith('}"}'): |
|
response = response[:-2] |
|
if response.endswith('}}'): |
|
response = response[:-1] |
|
|
|
return response |
|
|
|
def load_schemas(schemas_dir="schemas"): |
|
"""Load all schema files from the schemas directory.""" |
|
schemas = {} |
|
schema_files = Path(schemas_dir).glob("*.json") |
|
|
|
for schema_file in schema_files: |
|
try: |
|
with open(schema_file, 'r') as f: |
|
schema_data = json.load(f) |
|
schemas[schema_file.stem] = schema_data |
|
except Exception as e: |
|
print(f"β οΈ Error loading {schema_file}: {e}") |
|
|
|
return schemas |
|
|
|
def run_comprehensive_test(): |
|
"""Run the complete schema testing suite.""" |
|
|
|
print("π§ͺ Official Schema Testing System") |
|
print("=" * 50) |
|
|
|
|
|
print("π Loading evaluation schemas...") |
|
schemas = load_schemas() |
|
|
|
if not schemas: |
|
print("β No schemas found in schemas/ directory") |
|
return |
|
|
|
print(f"β
Loaded {len(schemas)} schemas: {', '.join(schemas.keys())}") |
|
|
|
|
|
tester = ModelTester() |
|
validator = SchemaValidator() |
|
|
|
|
|
results = {} |
|
total_tests = 0 |
|
total_passed = 0 |
|
|
|
print(f"\nπ― Running tests on all schemas...") |
|
print("-" * 50) |
|
|
|
|
|
for schema_name, schema_data in schemas.items(): |
|
print(f"\nπ Testing Schema: {schema_name}") |
|
print(f"π§ Function: {schema_data['name']}") |
|
|
|
|
|
test_questions = schema_data.get('test_questions', []) |
|
if not test_questions: |
|
print("β οΈ No test questions found, skipping") |
|
continue |
|
|
|
schema_results = [] |
|
|
|
|
|
for i, question in enumerate(test_questions, 1): |
|
print(f"\nβ Test {i}: {question}") |
|
|
|
|
|
response = tester.test_schema(schema_data, question) |
|
print(f"π€ Response: {response}") |
|
|
|
|
|
is_valid, error_msg = validator.validate_function_call(response, schema_data) |
|
|
|
if is_valid: |
|
print(f"β
PASS - {error_msg}") |
|
schema_results.append(True) |
|
total_passed += 1 |
|
else: |
|
print(f"β FAIL - {error_msg}") |
|
schema_results.append(False) |
|
|
|
total_tests += 1 |
|
|
|
|
|
schema_passed = sum(schema_results) |
|
schema_total = len(schema_results) |
|
schema_rate = schema_passed / schema_total * 100 |
|
|
|
results[schema_name] = { |
|
'passed': schema_passed, |
|
'total': schema_total, |
|
'rate': schema_rate, |
|
'results': schema_results |
|
} |
|
|
|
print(f"π Schema Summary: {schema_passed}/{schema_total} ({schema_rate:.1f}%)") |
|
|
|
|
|
print(f"\n" + "=" * 50) |
|
print(f"π OVERALL RESULTS") |
|
print(f"=" * 50) |
|
|
|
overall_rate = total_passed / total_tests * 100 |
|
print(f"β
Total passed: {total_passed}/{total_tests} ({overall_rate:.1f}%)") |
|
print(f"π― Target: β₯80% valid calls") |
|
|
|
|
|
print(f"\nπ Detailed Breakdown:") |
|
for schema_name, result in results.items(): |
|
status = "β
PASS" if result['rate'] >= 80 else "β FAIL" |
|
print(f" {schema_name}: {result['passed']}/{result['total']} ({result['rate']:.1f}%) {status}") |
|
|
|
|
|
if overall_rate >= 80: |
|
print(f"\nπ SUCCESS! Model meets the β₯80% target") |
|
print(f"π Ready for enterprise deployment") |
|
else: |
|
print(f"\nπ IMPROVEMENT NEEDED") |
|
print(f"π Current: {overall_rate:.1f}% | Target: β₯80%") |
|
print(f"π‘ Suggestions:") |
|
|
|
|
|
failed_schemas = [name for name, result in results.items() if result['rate'] < 80] |
|
|
|
if failed_schemas: |
|
print(f" 1. Focus training on: {', '.join(failed_schemas)}") |
|
print(f" 2. Add more examples for complex parameter schemas") |
|
print(f" 3. Increase training epochs or learning rate") |
|
|
|
print(f" 4. Consider using larger LoRA rank (r=16)") |
|
print(f" 5. Generate more diverse training examples") |
|
|
|
return results, overall_rate |
|
|
|
def main(): |
|
"""Main entry point.""" |
|
try: |
|
results, rate = run_comprehensive_test() |
|
|
|
|
|
with open("test_results.json", "w") as f: |
|
json.dump({ |
|
"overall_rate": rate, |
|
"results": results, |
|
"timestamp": str(torch.cuda.current_device() if torch.cuda.is_available() else "cpu") |
|
}, f, indent=2) |
|
|
|
print(f"\nπΎ Results saved to test_results.json") |
|
|
|
except Exception as e: |
|
print(f"β Testing failed: {e}") |
|
raise |
|
|
|
if __name__ == "__main__": |
|
main() |