File size: 8,430 Bytes
d9257e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
"""
test_constrained_model_spaces.py - SPACES-OPTIMIZED Constrained Generation

Ultra-aggressive optimization for Hugging Face Spaces environment
"""

import torch
import json
import jsonschema
from transformers import AutoTokenizer, AutoModelForCausalLM
from typing import Dict
import time
import threading

class TimeoutException(Exception):
    pass

def load_trained_model():
    """Load our model - SPACES OPTIMIZED"""
    print("πŸ”„ Loading SmolLM3-3B Function-Calling Agent...")
    
    base_model_name = "HuggingFaceTB/SmolLM3-3B"
    
    try:
        print("πŸ”„ Loading tokenizer...")
        tokenizer = AutoTokenizer.from_pretrained(base_model_name)
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token
        
        print("πŸ”„ Loading base model...")
        # SPACES OPTIMIZED: Memory efficient loading
        model = AutoModelForCausalLM.from_pretrained(
            base_model_name,
            torch_dtype=torch.float16,
            device_map="auto",
            low_cpu_mem_usage=True
        )
        
        # Try multiple paths for fine-tuned adapter
        adapter_paths = [
            "jlov7/SmolLM3-Function-Calling-LoRA",  # Hub (preferred)
            "./model_files",  # Local cleaned path
            "./smollm3_robust",  # Original training output
            "./hub_upload",  # Upload-ready files
        ]
        
        model_loaded = False
        for i, adapter_path in enumerate(adapter_paths):
            try:
                if i == 0:
                    print("πŸ”„ Loading fine-tuned adapter from Hugging Face Hub...")
                else:
                    print(f"πŸ”„ Trying local path: {adapter_path}")
                
                from peft import PeftModel
                model = PeftModel.from_pretrained(model, adapter_path)
                model = model.merge_and_unload()
                
                if i == 0:
                    print("βœ… Fine-tuned model loaded successfully from Hub!")
                else:
                    print(f"βœ… Fine-tuned model loaded successfully from {adapter_path}!")
                model_loaded = True
                break
                
            except Exception as e:
                if i == 0:
                    print(f"⚠️ Hub adapter not found: {e}")
                else:
                    print(f"⚠️ Path {adapter_path} failed: {e}")
                continue
        
        if not model_loaded:
            print("πŸ”§ Using base model with optimized prompting")
        
        print("βœ… Model loaded successfully")
        return model, tokenizer
        
    except Exception as e:
        print(f"❌ Error loading model: {e}")
        raise

def constrained_json_generate(model, tokenizer, prompt: str, schema: Dict, max_attempts: int = 2):
    """SPACES-OPTIMIZED generation with aggressive timeouts"""
    device = next(model.parameters()).device
    
    for attempt in range(max_attempts):
        try:
            # VERY aggressive settings for Spaces
            temperature = 0.1 + (attempt * 0.2)  # Start low, increase if needed
            
            inputs = tokenizer(prompt, return_tensors="pt").to(device)
            
            # Use threading timeout (cross-platform)
            result = [None]
            error = [None]
            
            def generate_with_timeout():
                try:
                    with torch.no_grad():
                        outputs = model.generate(
                            **inputs,
                            max_new_tokens=25,  # VERY short for Spaces
                            temperature=temperature,
                            do_sample=True,
                            pad_token_id=tokenizer.eos_token_id,
                            eos_token_id=tokenizer.eos_token_id,
                            num_return_sequences=1,
                            use_cache=True,
                            repetition_penalty=1.2  # Strong repetition penalty
                        )
                    result[0] = outputs
                except Exception as e:
                    error[0] = str(e)
            
            # Start generation thread
            thread = threading.Thread(target=generate_with_timeout)
            thread.daemon = True
            thread.start()
            thread.join(timeout=4)  # 4-second timeout
            
            if thread.is_alive():
                return "", False, f"Generation timed out (attempt {attempt + 1})"
            
            if error[0]:
                return "", False, f"Generation error: {error[0]}"
            
            if result[0] is None:
                return "", False, f"Generation failed (attempt {attempt + 1})"
            
            outputs = result[0]
            
            # Extract generated text
            generated_ids = outputs[0][inputs['input_ids'].shape[1]:]
            response = tokenizer.decode(generated_ids, skip_special_tokens=True).strip()
            
            # Try to extract JSON from response
            if "{" in response and "}" in response:
                start = response.find("{")
                bracket_count = 0
                end = start
                
                for i, char in enumerate(response[start:], start):
                    if char == "{":
                        bracket_count += 1
                    elif char == "}":
                        bracket_count -= 1
                        if bracket_count == 0:
                            end = i + 1
                            break
                
                json_str = response[start:end]
            else:
                json_str = response
            
            # Validate JSON and schema
            try:
                parsed = json.loads(json_str)
                jsonschema.validate(parsed, schema)
                return json_str, True, None
            except (json.JSONDecodeError, jsonschema.ValidationError) as e:
                if attempt == max_attempts - 1:
                    return json_str, False, f"JSON validation failed: {str(e)}"
                continue
                
        except Exception as e:
            if attempt == max_attempts - 1:
                return "", False, f"Generation error: {str(e)}"
            continue
    
    return "", False, "All generation attempts failed"

def create_json_schema(function_def: Dict) -> Dict:
    """Create JSON schema for function definition"""
    return {
        "type": "object",
        "properties": {
            "name": {
                "type": "string",
                "enum": [function_def["name"]]
            },
            "arguments": function_def["parameters"]
        },
        "required": ["name", "arguments"]
    }

def create_test_schemas():
    """Create simplified test schemas"""
    return {
        "weather_forecast": {
            "name": "get_weather_forecast",
            "description": "Get weather forecast",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"},
                    "days": {"type": "integer"}
                },
                "required": ["location", "days"]
            }
        }
    }

# Test if running directly
if __name__ == "__main__":
    print("πŸ§ͺ Testing SPACES-optimized model...")
    try:
        model, tokenizer = load_trained_model()
        
        test_schema = create_test_schemas()["weather_forecast"]
        schema = create_json_schema(test_schema)
        
        prompt = """<|im_start|>system
You are a helpful assistant that calls functions by responding with valid JSON when given a schema. Always respond with JSON function calls only, never prose.<|im_end|>

<schema>
{"name": "get_weather_forecast", "description": "Get weather forecast", "parameters": {"type": "object", "properties": {"location": {"type": "string"}, "days": {"type": "integer"}}, "required": ["location", "days"]}}
</schema>

<|im_start|>user
Get weather for Tokyo for 5 days<|im_end|>
<|im_start|>assistant
"""
        
        result, success, error = constrained_json_generate(model, tokenizer, prompt, schema)
        print(f"βœ… Result: {result}")
        print(f"βœ… Success: {success}")
        if error:
            print(f"⚠️ Error: {error}")
            
    except Exception as e:
        print(f"❌ Test failed: {e}")