Spaces:
Sleeping
Sleeping
| import os | |
| import gc | |
| import io | |
| from llama_cpp import Llama | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from tqdm import tqdm | |
| from dotenv import load_dotenv | |
| from pydantic import BaseModel | |
| from huggingface_hub import hf_hub_download, login | |
| from nltk.tokenize import word_tokenize | |
| from nltk.corpus import stopwords | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import nltk | |
| import uvicorn | |
| import psutil | |
| import torch | |
| import io | |
| nltk.download('punkt') | |
| nltk.download('stopwords') | |
| load_dotenv() | |
| app = FastAPI() | |
| HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") | |
| if HUGGINGFACE_TOKEN: | |
| login(token=HUGGINGFACE_TOKEN) | |
| model_configs = [ | |
| # ... (Your model configurations remain the same) ... | |
| ] | |
| global_data = {'model_configs': model_configs, 'training_data': io.StringIO()} | |
| class ModelManager: | |
| def __init__(self): | |
| self.models = {} | |
| self.load_models() | |
| def load_models(self): | |
| for config in tqdm(global_data['model_configs'], desc="Loading models"): | |
| model_name = config['name'] | |
| if model_name not in self.models: | |
| try: | |
| model_bytes = hf_hub_download(repo_id=config['repo_id'], filename=config['filename'], use_auth_token=HUGGINGFACE_TOKEN) | |
| model = Llama(model_path=io.BytesIO(model_bytes), n_ctx=512, n_gpu=1) # Correct: Use io.BytesIO | |
| self.models[model_name] = model | |
| print(f"Model '{model_name}' loaded successfully.") | |
| except Exception as e: | |
| print(f"Error loading model {model_name}: {e}") | |
| self.models[model_name] = None | |
| finally: | |
| gc.collect() | |
| def get_model(self, model_name: str): | |
| return self.models.get(model_name) | |
| model_manager = ModelManager() | |
| class ChatRequest(BaseModel): | |
| message: str | |
| async def generate_model_response(model, inputs: str) -> str: | |
| try: | |
| if model: | |
| response = model(inputs, max_tokens=150) | |
| return response['choices'][0]['text'].strip() | |
| else: | |
| return "Model not loaded" | |
| except Exception as e: | |
| return f"Error: Could not generate a response. Details: {e}" | |
| async def process_message(message: str) -> dict: | |
| inputs = message.strip() | |
| responses = {} | |
| loaded_models = [model for model in global_data['model_configs'] if model_manager.get_model(model['name'])] | |
| with ThreadPoolExecutor(max_workers=min(len(loaded_models), 4)) as executor: | |
| futures = [executor.submit(generate_model_response, model_manager.get_model(config['name']), inputs) for config in loaded_models] | |
| for i, future in enumerate(tqdm(as_completed(futures), total=len(futures), desc="Generating responses")): | |
| try: | |
| model_name = loaded_models[i]['name'] | |
| responses[model_name] = future.result() | |
| except Exception as e: | |
| responses[model_name] = f"Error processing {model_name}: {e}" | |
| stop_words = set(stopwords.words('english')) | |
| vectorizer = TfidfVectorizer(tokenizer=word_tokenize, stop_words=stop_words) | |
| reference_text = message | |
| response_texts = list(responses.values()) | |
| if response_texts: | |
| tfidf_matrix = vectorizer.fit_transform([reference_text] + response_texts) | |
| similarities = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:]) | |
| best_response_index = similarities.argmax() | |
| best_response_model = list(responses.keys())[best_response_index] | |
| best_response_text = response_texts[best_response_index] | |
| return {"best_response": {"model": best_response_model, "text": best_response_text}, "all_responses": responses} | |
| else: | |
| return {"best_response": {"model": None, "text": "No models loaded successfully."}, "all_responses": responses} | |
| async def api_generate_multimodel(request: Request): | |
| try: | |
| data = await request.json() | |
| message = data.get("message") | |
| if not message: | |
| raise HTTPException(status_code=400, detail="Missing message") | |
| response = await process_message(message) | |
| return JSONResponse(response) | |
| except HTTPException as e: | |
| raise e | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| async def startup(): | |
| pass | |
| async def shutdown(): | |
| gc.collect() | |
| app.add_event_handler("startup", startup) | |
| app.add_event_handler("shutdown", shutdown) | |
| def release_resources(): | |
| try: | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| except Exception as e: | |
| print(f"Failed to release resources: {e}") | |
| def resource_manager(): | |
| MAX_RAM_PERCENT = 20 | |
| MAX_CPU_PERCENT = 20 | |
| MAX_GPU_PERCENT = 20 | |
| MAX_RAM_MB = 2048 | |
| while True: | |
| try: | |
| virtual_mem = psutil.virtual_memory() | |
| current_ram_percent = virtual_mem.percent | |
| current_ram_mb = virtual_mem.used / (1024 * 1024) | |
| if current_ram_percent > MAX_RAM_PERCENT or current_ram_mb > MAX_RAM_MB: | |
| release_resources() | |
| current_cpu_percent = psutil.cpu_percent() | |
| if current_cpu_percent > MAX_CPU_PERCENT: | |
| psutil.Process(os.getpid()).nice() | |
| if torch.cuda.is_available(): | |
| gpu = torch.cuda.current_device() | |
| gpu_mem = torch.cuda.memory_percent(gpu) | |
| if gpu_mem > MAX_GPU_PERCENT: | |
| release_resources() | |
| except Exception as e: | |
| print(f"Error in resource manager: {e}") | |
| if __name__ == "__main__": | |
| import threading | |
| resource_thread = threading.Thread(target=resource_manager) | |
| resource_thread.daemon = True | |
| resource_thread.start() | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) |