File size: 12,489 Bytes
6639f75
 
 
 
 
 
 
 
 
 
 
5410dc5
6639f75
 
 
 
1b5bd3c
 
6639f75
 
 
 
1b5bd3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ced644c
 
 
 
 
 
 
 
 
 
 
1a014e1
ced644c
 
 
 
 
 
 
 
1a014e1
ced644c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1b5bd3c
 
 
 
 
 
 
6639f75
 
 
 
 
 
 
1b5bd3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ced644c
1b5bd3c
 
 
 
 
ced644c
 
1b5bd3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ced644c
1b5bd3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6639f75
1b5bd3c
 
6639f75
1b5bd3c
6639f75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
"""
test_constrained_model.py - Test Constrained Generation with Trained Model

This tests our intensively trained model using constrained JSON generation
to force valid outputs and solve the "Expecting ',' delimiter" issues.
"""

import torch
import json
import jsonschema
from transformers import AutoTokenizer, AutoModelForCausalLM
# from peft import PeftModel  # Not needed for base model demo
from typing import Dict, List
import time

def load_trained_model():
    """Load our model - tries fine-tuned first, falls back to base model."""
    print("πŸ”„ Loading SmolLM3-3B Function-Calling Agent...")
    
    # Load base model
    base_model_name = "HuggingFaceTB/SmolLM3-3B"
    
    try:
        print("πŸ”„ Loading tokenizer...")
        tokenizer = AutoTokenizer.from_pretrained(base_model_name)
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token
        
        print("πŸ”„ Loading base model...")
        # Use smaller data type for Hugging Face Spaces
        model = AutoModelForCausalLM.from_pretrained(
            base_model_name,
            torch_dtype=torch.float16,  # Use float16 for better memory usage
            device_map="auto",
            low_cpu_mem_usage=True  # Reduce memory usage during loading
        )
        
        # Try multiple paths for fine-tuned adapter
        adapter_paths = [
            "jlov7/SmolLM3-Function-Calling-LoRA",  # Hub (preferred)
            "./model_files",  # Local cleaned path
            "./smollm3_robust",  # Original training output
            "./hub_upload",  # Upload-ready files
            "./final_model_backup_20250721_202951"  # Backup
        ]
        
        model_loaded = False
        for i, adapter_path in enumerate(adapter_paths):
            try:
                if i == 0:
                    print("πŸ”„ Loading fine-tuned adapter from Hugging Face Hub...")
                else:
                    print(f"πŸ”„ Trying local path: {adapter_path}")
                
                # Import here to avoid issues if peft not available
                from peft import PeftModel
                model = PeftModel.from_pretrained(model, adapter_path)
                model = model.merge_and_unload()
                
                if i == 0:
                    print("βœ… Fine-tuned model loaded successfully from Hub!")
                else:
                    print(f"βœ… Fine-tuned model loaded successfully from {adapter_path}!")
                model_loaded = True
                break
                
            except Exception as e:
                if i == 0:
                    print(f"⚠️ Hub adapter not found: {e}")
                else:
                    print(f"⚠️ Path {adapter_path} failed: {e}")
                continue
        
        if not model_loaded:
            print("πŸ”§ Using base model with optimized prompting")
            print("πŸ“‹ Note: Install fine-tuned adapter for 100% success rate")
        
        print("βœ… Model loaded successfully")
        return model, tokenizer
        
    except Exception as e:
        print(f"❌ Error loading model: {e}")
        raise

def constrained_json_generate(model, tokenizer, prompt: str, schema: Dict, max_attempts: int = 3):
    """Generate JSON with multiple attempts and validation."""
    device = next(model.parameters()).device
    
    for attempt in range(max_attempts):
        try:
            # Generate with different temperatures for diversity
            temperature = 0.1 + (attempt * 0.1)
            
            inputs = tokenizer(prompt, return_tensors="pt").to(device)
            
            # Simple timeout protection using threading (cross-platform)
            import threading
            
            result = [None]
            error = [None]
            
            def generate_with_timeout():
                try:
                    with torch.no_grad():
                        outputs = model.generate(
                            **inputs,
                            max_new_tokens=50,  # Very short for Spaces performance
                            temperature=temperature,
                            do_sample=True,
                            pad_token_id=tokenizer.eos_token_id,
                            eos_token_id=tokenizer.eos_token_id,
                            num_return_sequences=1,
                            use_cache=True,
                            repetition_penalty=1.1  # Prevent repetition
                        )
                    
                    # Extract generated text
                    generated_ids = outputs[0][inputs['input_ids'].shape[1]:]
                    response = tokenizer.decode(generated_ids, skip_special_tokens=True).strip()
                    
                    # Try to extract JSON from response
                    if "{" in response and "}" in response:
                        # Find the first complete JSON object
                        start = response.find("{")
                        bracket_count = 0
                        end = start
                        
                        for i, char in enumerate(response[start:], start):
                            if char == "{":
                                bracket_count += 1
                            elif char == "}":
                                bracket_count -= 1
                                if bracket_count == 0:
                                    end = i + 1
                                    break
                        
                        json_str = response[start:end]
                        result[0] = json_str
                    else:
                        result[0] = response
                        
                except Exception as e:
                    error[0] = str(e)
            
            # Start generation in a separate thread with timeout
            thread = threading.Thread(target=generate_with_timeout)
            thread.daemon = True
            thread.start()
            thread.join(timeout=8)  # Aggressive 8-second timeout for Spaces
            
            if thread.is_alive():
                return "", False, f"Generation timed out (attempt {attempt + 1})"
            
            if error[0]:
                if attempt == max_attempts - 1:
                    return "", False, f"Generation error: {error[0]}"
                continue
                
            if result[0]:
                # Validate JSON and schema
                try:
                    parsed = json.loads(result[0])
                    jsonschema.validate(parsed, schema)
                    return result[0], True, None
                except (json.JSONDecodeError, jsonschema.ValidationError) as e:
                    if attempt == max_attempts - 1:
                        return result[0], False, f"JSON validation failed: {str(e)}"
                    continue
                    
        except Exception as e:
            if attempt == max_attempts - 1:
                return "", False, f"Generation error: {str(e)}"
            continue
    
    return "", False, "All generation attempts failed"

def create_test_schemas():
    """Create the test schemas we're evaluating against."""
    return {
        "weather_forecast": {
            "name": "get_weather_forecast",
            "description": "Get weather forecast",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"},
                    "days": {"type": "integer"},
                    "units": {"type": "string"},
                    "include_hourly": {"type": "boolean"}
                },
                "required": ["location", "days"]
            }
        },
        "sentiment_analysis": {
            "name": "analyze_sentiment",
            "description": "Analyze text sentiment",
            "parameters": {
                "type": "object",
                "properties": {
                    "text": {"type": "string"},
                    "language": {"type": "string"},
                    "include_emotions": {"type": "boolean"},
                    "confidence_threshold": {"type": "number"}
                },
                "required": ["text"]
            }
        },
        "currency_converter": {
            "name": "convert_currency",
            "description": "Convert currency amounts",
            "parameters": {
                "type": "object",
                "properties": {
                    "amount": {"type": "number"},
                    "from_currency": {"type": "string"},
                    "to_currency": {"type": "string"},
                    "include_fees": {"type": "boolean"},
                    "precision": {"type": "integer"}
                },
                "required": ["amount", "from_currency", "to_currency"]
            }
        }
    }

def create_json_schema(function_def: Dict) -> Dict:
    """Create JSON schema for validation."""
    return {
        "type": "object",
        "properties": {
            "name": {
                "type": "string",
                "const": function_def["name"]
            },
            "arguments": function_def["parameters"]
        },
        "required": ["name", "arguments"],
        "additionalProperties": False
    }

def test_constrained_generation():
    """Test constrained generation on our problem schemas."""
    print("πŸ§ͺ Testing Constrained Generation with Trained Model")
    print("=" * 60)
    
    # Load trained model
    model, tokenizer = load_trained_model()
    
    # Get test schemas
    schemas = create_test_schemas()
    
    test_cases = [
        ("weather_forecast", "Get 3-day weather for San Francisco in metric units"),
        ("sentiment_analysis", "Analyze sentiment: The product was excellent and delivery was fast"),
        ("currency_converter", "Convert 500 USD to EUR with fees included"),
        ("weather_forecast", "Give me tomorrow's weather for London with hourly details"),
        ("sentiment_analysis", "Check sentiment for I am frustrated with this service"),
        ("currency_converter", "Convert 250 EUR to CAD using rates from 2023-12-01")
    ]
    
    results = {"passed": 0, "total": len(test_cases), "details": []}
    
    for schema_name, query in test_cases:
        print(f"\n🎯 Testing: {schema_name}")
        print(f"πŸ“ Query: {query}")
        
        # Create prompt
        function_def = schemas[schema_name]
        schema = create_json_schema(function_def)
        
        prompt = f"""<|im_start|>system
You are a helpful assistant that calls functions by responding with valid JSON when given a schema. Always respond with JSON function calls only, never prose.<|im_end|>

<schema>
{json.dumps(function_def, indent=2)}
</schema>

<|im_start|>user
{query}<|im_end|>
<|im_start|>assistant
"""
        
        # Test constrained generation
        response, success, error = constrained_json_generate(model, tokenizer, prompt, schema)
        
        print(f"πŸ€– Response: {response}")
        if success:
            print("βœ… PASS - Valid JSON with correct schema!")
            results["passed"] += 1
        else:
            print(f"❌ FAIL - {error}")
        
        results["details"].append({
            "schema": schema_name,
            "query": query,
            "response": response,
            "success": success,
            "error": error
        })
    
    # Calculate success rate
    success_rate = (results["passed"] / results["total"]) * 100
    
    print(f"\nπŸ† CONSTRAINED GENERATION RESULTS")
    print("=" * 60)
    print(f"βœ… Passed: {results['passed']}/{results['total']} ({success_rate:.1f}%)")
    print(f"🎯 Target: β‰₯80%")
    
    if success_rate >= 80:
        print("πŸŽ‰ SUCCESS! Reached 80%+ target with constrained generation!")
    else:
        print(f"πŸ“ˆ Improvement needed: +{80 - success_rate:.1f}% to reach target")
    
    # Save results
    with open("constrained_results.json", "w") as f:
        json.dump({
            "success_rate": success_rate,
            "passed": results["passed"],
            "total": results["total"],
            "details": results["details"],
            "timestamp": time.time()
        }, f, indent=2)
    
    print(f"πŸ’Ύ Results saved to constrained_results.json")
    
    return success_rate

if __name__ == "__main__":
    success_rate = test_constrained_generation()