|
from fastapi import FastAPI, Request, HTTPException,Depends,File, UploadFile, Response |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import JSONResponse |
|
from fastapi.staticfiles import StaticFiles |
|
from huggingface_hub import InferenceClient |
|
import secrets |
|
from typing import Optional |
|
from sentence_transformers import SentenceTransformer |
|
from bson.objectid import ObjectId |
|
from datetime import datetime, timedelta |
|
from fastapi import Request |
|
import requests |
|
import numpy as np |
|
import argparse |
|
import os |
|
from pymongo import MongoClient |
|
from datetime import datetime |
|
from passlib.hash import bcrypt |
|
import PyPDF2 |
|
from io import BytesIO |
|
import uuid |
|
|
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import time |
|
|
|
from fastapi.responses import StreamingResponse |
|
import json |
|
import asyncio |
|
|
|
|
|
from langchain_community.document_loaders import PyPDFDirectoryLoader |
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
|
|
|
|
SECRET_KEY = secrets.token_hex(32) |
|
|
|
HOST = os.environ.get("API_URL", "0.0.0.0") |
|
PORT = os.environ.get("PORT", 7860) |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("--host", default=HOST) |
|
parser.add_argument("--port", type=int, default=PORT) |
|
parser.add_argument("--reload", action="store_true", default=True) |
|
parser.add_argument("--ssl_certfile") |
|
parser.add_argument("--ssl_keyfile") |
|
args = parser.parse_args() |
|
|
|
|
|
mongo_uri = os.environ.get("MONGODB_URI", "mongodb+srv://giffardaxel95:[email protected]/") |
|
db_name = os.environ.get("DB_NAME", "chatmed_schizo") |
|
mongo_client = MongoClient(mongo_uri) |
|
db = mongo_client[db_name] |
|
|
|
SAVE_FOLDER = "files" |
|
COLLECTION_NAME="connaissances" |
|
os.makedirs(SAVE_FOLDER, exist_ok=True) |
|
|
|
|
|
app = FastAPI() |
|
app.add_middleware( |
|
CORSMiddleware, |
|
|
|
allow_origins=[ |
|
"https://axl95-medically.hf.space", |
|
"https://huggingface.co", |
|
"http://localhost:3000", |
|
"http://localhost:7860", |
|
"http://0.0.0.0:7860" |
|
], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
def download_pdf(url, save_path, retries=2, delay=3): |
|
for attempt in range(retries): |
|
try: |
|
req = Request(url, headers={'User-Agent': 'Mozilla/5.0'}) |
|
with urlopen(req) as response, open(save_path, 'wb') as f: |
|
f.write(response.read()) |
|
print(f"Téléchargé : {save_path}") |
|
return |
|
except (HTTPError, URLError) as e: |
|
print(f"Erreur ({e}) pour {url}, tentative {attempt+1}/{retries}") |
|
time.sleep(delay) |
|
print(f"Échec du téléchargement : {url}") |
|
|
|
''' |
|
Le chargement automatique des PDFs est désactivé. La base de données utilise les embeddings existants. |
|
for url in PDF_URLS: |
|
file_name = url.split("/")[-1] |
|
file_path = os.path.join(SAVE_FOLDER, file_name) |
|
if not os.path.exists(file_path): |
|
download_pdf(url, file_path) |
|
|
|
loader = PyPDFDirectoryLoader(SAVE_FOLDER) |
|
docs = loader.load() |
|
|
|
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) |
|
chunks = splitter.split_documents(docs) |
|
print(f"{len(chunks)} morceaux extraits.") |
|
|
|
embedding_model = HuggingFaceEmbeddings(model_name="shtilev/medical_embedded_v2") |
|
|
|
client = MongoClient(MONGO_URI) |
|
collection = client[DB_NAME][COLLECTION_NAME] |
|
|
|
collection.delete_many({}) |
|
|
|
for chunk in chunks: |
|
text = chunk.page_content |
|
embedding = embedding_model.embed_query(text) |
|
collection.insert_one({ |
|
"text": text, |
|
"embedding": embedding |
|
}) |
|
|
|
print("Tous les embeddings ont été insérés dans la base MongoDB.") |
|
''' |
|
|
|
|
|
|
|
|
|
def retrieve_relevant_context(query, embedding_model, mongo_collection, k=5): |
|
query_embedding = embedding_model.embed_query(query) |
|
|
|
docs = list(mongo_collection.find({}, {"text": 1, "embedding": 1})) |
|
|
|
print(f"[DEBUG] Recherche de contexte pour: '{query}'") |
|
print(f"[DEBUG] {len(docs)} documents trouvés dans la base de données") |
|
|
|
if not docs: |
|
print("[DEBUG] Aucun document dans la collection. RAG désactivé.") |
|
return "" |
|
|
|
|
|
similarities = [] |
|
for i, doc in enumerate(docs): |
|
if "embedding" not in doc or not doc["embedding"]: |
|
print(f"[DEBUG] Document {i} sans embedding") |
|
continue |
|
|
|
sim = cosine_similarity([query_embedding], [doc["embedding"]])[0][0] |
|
similarities.append((sim, i, doc["text"])) |
|
|
|
similarities.sort(reverse=True) |
|
|
|
|
|
print("\n=== CONTEXTE SÉLECTIONNÉ ===") |
|
top_k_docs = [] |
|
for i, (score, idx, text) in enumerate(similarities[:k]): |
|
doc_preview = text[:100] + "..." if len(text) > 100 else text |
|
print(f"Document #{i+1} (score: {score:.4f}): {doc_preview}") |
|
top_k_docs.append(text) |
|
print("==========================\n") |
|
|
|
return "\n\n".join(top_k_docs) |
|
|
|
|
|
|
|
async def get_admin_user(request: Request): |
|
user = await get_current_user(request) |
|
if user["role"] != "Administrateur": |
|
raise HTTPException(status_code=403, detail="Accès interdit: Droits d'administrateur requis") |
|
return user |
|
|
|
|
|
try: |
|
embedding_model = HuggingFaceEmbeddings(model_name="shtilev/medical_embedded_v2") |
|
print("✅ Modèle d'embedding médical chargé avec succès") |
|
|
|
except Exception as e: |
|
print(f"Erreur lors du chargement du modèle d'embedding: {str(e)}") |
|
embedding_model = None |
|
|
|
doc_count = db.connaissances.count_documents({}) |
|
print(f"\n[DIAGNOSTIC] Collection 'connaissances': {doc_count} documents trouvés") |
|
if doc_count == 0: |
|
print("[AVERTISSEMENT] La collection est vide. Le système RAG ne fonctionnera pas!") |
|
print("[AVERTISSEMENT] Veuillez charger des documents via l'API admin ou exécuter le script d'initialisation.") |
|
else: |
|
sample_doc = db.connaissances.find_one({}) |
|
has_embeddings = "embedding" in sample_doc and sample_doc["embedding"] is not None |
|
print(f"[DIAGNOSTIC] Les documents ont des embeddings: {'✅ Oui' if has_embeddings else '❌ Non'}") |
|
if not has_embeddings: |
|
print("[AVERTISSEMENT] Les documents n'ont pas d'embeddings valides!") |
|
@app.post("/api/admin/knowledge/upload") |
|
async def upload_pdf( |
|
file: UploadFile = File(...), |
|
title: str = None, |
|
tags: str = None, |
|
current_user: dict = Depends(get_admin_user) |
|
): |
|
try: |
|
if not file.filename.endswith('.pdf'): |
|
raise HTTPException(status_code=400, detail="Le fichier doit être un PDF") |
|
|
|
contents = await file.read() |
|
pdf_file = BytesIO(contents) |
|
|
|
pdf_reader = PyPDF2.PdfReader(pdf_file) |
|
text_content = "" |
|
for page_num in range(len(pdf_reader.pages)): |
|
text_content += pdf_reader.pages[page_num].extract_text() + "\n" |
|
|
|
embedding = None |
|
if embedding_model: |
|
try: |
|
|
|
max_length = 5000 |
|
truncated_text = text_content[:max_length] |
|
embedding = embedding_model.embed_query(truncated_text) |
|
except Exception as e: |
|
print(f"Erreur lors de la génération de l'embedding: {str(e)}") |
|
|
|
doc_id = ObjectId() |
|
|
|
pdf_path = f"files/{str(doc_id)}.pdf" |
|
os.makedirs("files", exist_ok=True) |
|
with open(pdf_path, "wb") as f: |
|
pdf_file.seek(0) |
|
f.write(contents) |
|
|
|
document = { |
|
"_id": doc_id, |
|
"text": text_content, |
|
"embedding": embedding, |
|
"title": title or file.filename, |
|
"tags": tags.split(",") if tags else [], |
|
"uploaded_by": str(current_user["_id"]), |
|
"upload_date": datetime.utcnow() |
|
} |
|
|
|
print(f"Tentative d'insertion du document avec ID: {doc_id}") |
|
result = db.connaissances.insert_one(document) |
|
print(f"Document inséré avec ID: {result.inserted_id}") |
|
|
|
|
|
verification = db.connaissances.find_one({"_id": doc_id}) |
|
if verification: |
|
print(f"Document vérifié et trouvé dans la base de données") |
|
return {"success": True, "document_id": str(doc_id)} |
|
else: |
|
print(f"ERREUR: Document non trouvé après insertion") |
|
return {"success": False, "error": "Document non trouvé après insertion"} |
|
|
|
except Exception as e: |
|
import traceback |
|
print(f"Erreur lors de l'upload du PDF: {traceback.format_exc()}") |
|
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") |
|
|
|
@app.get("/api/admin/knowledge") |
|
async def list_documents(current_user: dict = Depends(get_admin_user)): |
|
try: |
|
documents = list(db.connaissances.find().sort("upload_date", -1)) |
|
|
|
result = [] |
|
for doc in documents: |
|
doc_safe = { |
|
"id": str(doc["_id"]), |
|
"title": doc.get("title", "Sans titre"), |
|
"tags": doc.get("tags", []), |
|
"date": doc.get("upload_date").isoformat() if "upload_date" in doc else None, |
|
"text_preview": doc.get("text", "")[:100] + "..." if len(doc.get("text", "")) > 100 else doc.get("text", "") |
|
} |
|
result.append(doc_safe) |
|
|
|
return {"documents": result} |
|
except Exception as e: |
|
print(f"Erreur lors de la liste des documents: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") |
|
|
|
|
|
|
|
@app.delete("/api/admin/knowledge/{document_id}") |
|
async def delete_document(document_id: str, current_user: dict = Depends(get_admin_user)): |
|
try: |
|
try: |
|
doc_id = ObjectId(document_id) |
|
except Exception: |
|
raise HTTPException(status_code=400, detail="ID de document invalide") |
|
|
|
|
|
document = db.connaissances.find_one({"_id": doc_id}) |
|
if not document: |
|
raise HTTPException(status_code=404, detail="Document non trouvé") |
|
|
|
|
|
result = db.connaissances.delete_one({"_id": doc_id}) |
|
|
|
if result.deleted_count == 0: |
|
raise HTTPException(status_code=500, detail="Échec de la suppression du document") |
|
|
|
|
|
pdf_path = f"files/{document_id}.pdf" |
|
if os.path.exists(pdf_path): |
|
try: |
|
os.remove(pdf_path) |
|
print(f"Fichier supprimé: {pdf_path}") |
|
except Exception as e: |
|
print(f"Erreur lors de la suppression du fichier: {str(e)}") |
|
|
|
return {"success": True, "message": "Document supprimé avec succès"} |
|
|
|
except HTTPException as he: |
|
raise he |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Erreur lors de la suppression: {str(e)}") |
|
|
|
|
|
@app.post("/api/login") |
|
async def login(request: Request, response: Response): |
|
try: |
|
data = await request.json() |
|
email = data.get("email") |
|
password = data.get("password") |
|
|
|
user = db.users.find_one({"email": email}) |
|
if not user or not bcrypt.verify(password, user["password"]): |
|
raise HTTPException(status_code=401, detail="Email ou mot de passe incorrect") |
|
|
|
session_id = secrets.token_hex(16) |
|
user_id = str(user["_id"]) |
|
username = f"{user['prenom']} {user['nom']}" |
|
|
|
db.sessions.insert_one({ |
|
"session_id": session_id, |
|
"user_id": user_id, |
|
"created_at": datetime.utcnow(), |
|
"expires_at": datetime.utcnow() + timedelta(days=7) |
|
}) |
|
|
|
response.set_cookie( |
|
key="session_id", |
|
value=session_id, |
|
httponly=False, |
|
max_age=7*24*60*60, |
|
samesite="none", |
|
secure=True, |
|
path="/" |
|
) |
|
|
|
|
|
print(f"Session créée: {session_id} pour l'utilisateur {user_id}") |
|
|
|
return { |
|
"success": True, |
|
"username": username, |
|
"user_id": user_id, |
|
"session_id": session_id, |
|
"role": user.get("role", "user") |
|
|
|
} |
|
|
|
except Exception as e: |
|
print(f"Erreur login: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
async def get_current_user(request: Request): |
|
session_id = request.cookies.get("session_id") |
|
print(f"Cookie de session reçu: {session_id[:5] if session_id else 'None'}") |
|
|
|
if not session_id: |
|
auth_header = request.headers.get("Authorization") |
|
if auth_header and auth_header.startswith("Bearer "): |
|
session_id = auth_header.replace("Bearer ", "") |
|
print(f"Session d'autorisation reçue: {session_id[:5]}...") |
|
|
|
if not session_id: |
|
session_id = request.query_params.get("session_id") |
|
if session_id: |
|
print(f"Session des paramètres de requête: {session_id[:5]}...") |
|
|
|
if not session_id: |
|
raise HTTPException(status_code=401, detail="Non authentifié - Aucune session trouvée") |
|
|
|
session = db.sessions.find_one({ |
|
"session_id": session_id, |
|
"expires_at": {"$gt": datetime.utcnow()} |
|
}) |
|
|
|
if not session: |
|
raise HTTPException(status_code=401, detail="Session expirée ou invalide") |
|
|
|
user = db.users.find_one({"_id": ObjectId(session["user_id"])}) |
|
if not user: |
|
raise HTTPException(status_code=401, detail="Utilisateur non trouvé") |
|
|
|
return user |
|
|
|
@app.post("/api/logout") |
|
async def logout(request: Request, response: Response): |
|
session_id = request.cookies.get("session_id") |
|
if session_id: |
|
db.sessions.delete_one({"session_id": session_id}) |
|
|
|
response.delete_cookie(key="session_id") |
|
return {"success": True} |
|
@app.post("/api/register") |
|
async def register(request: Request): |
|
try: |
|
data = await request.json() |
|
|
|
required_fields = ["prenom", "nom", "email", "password"] |
|
for field in required_fields: |
|
if not data.get(field): |
|
raise HTTPException(status_code=400, detail=f"Le champ {field} est requis") |
|
|
|
existing_user = db.users.find_one({"email": data["email"]}) |
|
if existing_user: |
|
raise HTTPException(status_code=409, detail="Cet email est déjà utilisé") |
|
|
|
hashed_password = bcrypt.hash(data["password"]) |
|
|
|
user = { |
|
"prenom": data["prenom"], |
|
"nom": data["nom"], |
|
"email": data["email"], |
|
"password": hashed_password, |
|
"createdAt": datetime.utcnow(), |
|
"role": data.get("role", "user"), |
|
|
|
} |
|
|
|
result = db.users.insert_one(user) |
|
|
|
return {"message": "Utilisateur créé avec succès", "userId": str(result.inserted_id)} |
|
|
|
except HTTPException as he: |
|
raise he |
|
|
|
except Exception as e: |
|
import traceback |
|
print(f"Erreur lors de l'inscription: {str(e)}") |
|
print(traceback.format_exc()) |
|
raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") |
|
@app.post("/api/embed") |
|
async def embed(request: Request): |
|
data = await request.json() |
|
texts = data.get("texts", []) |
|
|
|
try: |
|
|
|
dummy_embedding = [[0.1, 0.2, 0.3] for _ in range(len(texts))] |
|
|
|
return {"embeddings": dummy_embedding} |
|
except Exception as e: |
|
return {"error": str(e)} |
|
|
|
@app.get("/invert") |
|
async def invert(text: str): |
|
return { |
|
"original": text, |
|
"inverted": text[::-1], |
|
} |
|
|
|
HF_TOKEN = os.getenv('REACT_APP_HF_TOKEN') |
|
if not HF_TOKEN: |
|
raise RuntimeError("Le token Hugging Face (HF_TOKEN) n'est pas défini dans les variables d'environnement.") |
|
conversation_history = {} |
|
hf_client = InferenceClient(token=HF_TOKEN) |
|
@app.post("/api/chat") |
|
async def chat(request: Request): |
|
global conversation_history |
|
|
|
|
|
data = await request.json() |
|
user_message = data.get("message", "").strip() |
|
conversation_id = data.get("conversation_id") |
|
|
|
if not user_message: |
|
raise HTTPException(status_code=400, detail="Le champ 'message' est requis.") |
|
|
|
current_user = None |
|
try: |
|
current_user = await get_current_user(request) |
|
except HTTPException: |
|
pass |
|
|
|
current_tokens = 0 |
|
message_tokens = 0 |
|
if current_user and conversation_id: |
|
conv = db.conversations.find_one({ |
|
"_id": ObjectId(conversation_id), |
|
"user_id": str(current_user["_id"]) |
|
}) |
|
if conv: |
|
current_tokens = conv.get("token_count", 0) |
|
message_tokens = int(len(user_message.split()) * 1.3) |
|
MAX_TOKENS = 2000 |
|
if current_tokens + message_tokens > MAX_TOKENS: |
|
return JSONResponse({ |
|
"error": "token_limit_exceeded", |
|
"message": "Cette conversation a atteint sa limite de taille. Veuillez en créer une nouvelle.", |
|
"tokens_used": current_tokens, |
|
"tokens_limit": MAX_TOKENS |
|
}, status_code=403) |
|
|
|
if conversation_id and current_user: |
|
db.messages.insert_one({ |
|
"conversation_id": conversation_id, |
|
"user_id": str(current_user["_id"]), |
|
"sender": "user", |
|
"text": user_message, |
|
"timestamp": datetime.utcnow() |
|
}) |
|
|
|
is_history_question = any( |
|
phrase in user_message.lower() |
|
for phrase in [ |
|
"ma première question", "ma précédente question", "ma dernière question", |
|
"ce que j'ai demandé", "j'ai dit quoi", "quelles questions", |
|
"c'était quoi ma", "quelle était ma", "mes questions" |
|
] |
|
) |
|
|
|
if conversation_id not in conversation_history: |
|
conversation_history[conversation_id] = [] |
|
|
|
if current_user and conversation_id: |
|
previous_messages = list(db.messages.find( |
|
{"conversation_id": conversation_id} |
|
).sort("timestamp", 1)) |
|
|
|
for msg in previous_messages: |
|
if msg["sender"] == "user": |
|
conversation_history[conversation_id].append(f"Question : {msg['text']}") |
|
else: |
|
conversation_history[conversation_id].append(f"Réponse : {msg['text']}") |
|
|
|
if is_history_question: |
|
actual_questions = [] |
|
|
|
if conversation_id in conversation_history: |
|
for msg in conversation_history[conversation_id]: |
|
if msg.startswith("Question : "): |
|
q_text = msg.replace("Question : ", "") |
|
|
|
is_meta = any(phrase in q_text.lower() for phrase in [ |
|
"ma première question", "ma précédente question", "ma dernière question", |
|
"ce que j'ai demandé", "j'ai dit quoi", "quelles questions", |
|
"c'était quoi ma", "quelle était ma", "mes questions" |
|
]) |
|
if not is_meta: |
|
actual_questions.append(q_text) |
|
|
|
if not actual_questions: |
|
return JSONResponse({ |
|
"response": "Vous n'avez pas encore posé de question dans cette conversation. C'est notre premier échange." |
|
}) |
|
|
|
question_number = None |
|
|
|
if any(p in user_message.lower() for p in ["première question", "1ère question", "1ere question"]): |
|
question_number = 1 |
|
elif any(p in user_message.lower() for p in ["deuxième question", "2ème question", "2eme question", "seconde question"]): |
|
question_number = 2 |
|
else: |
|
import re |
|
match = re.search(r'(\d+)[eèiéê]*m*e* question', user_message.lower()) |
|
if match: |
|
try: |
|
question_number = int(match.group(1)) |
|
except: |
|
pass |
|
|
|
if question_number is not None: |
|
if 0 < question_number <= len(actual_questions): |
|
suffix = "ère" if question_number == 1 else "ème" |
|
return JSONResponse({ |
|
"response": f"Votre {question_number}{suffix} question était : \"{actual_questions[question_number-1]}\"" |
|
}) |
|
else: |
|
return JSONResponse({ |
|
"response": f"Vous n'avez pas encore posé {question_number} questions dans cette conversation." |
|
}) |
|
|
|
else: |
|
if len(actual_questions) == 1: |
|
return JSONResponse({ |
|
"response": f"Vous avez posé une seule question jusqu'à présent : \"{actual_questions[0]}\"" |
|
}) |
|
else: |
|
question_list = "\n".join([f"{i+1}. {q}" for i, q in enumerate(actual_questions)]) |
|
return JSONResponse({ |
|
"response": f"Voici les questions que vous avez posées dans cette conversation :\n\n{question_list}" |
|
}) |
|
|
|
context = None |
|
if not is_history_question and embedding_model: |
|
context = retrieve_relevant_context(user_message, embedding_model, db.connaissances, k=5) |
|
if context and conversation_id: |
|
conversation_history[conversation_id].append(f"Contexte : {context}") |
|
|
|
if conversation_id: |
|
conversation_history[conversation_id].append(f"Question : {user_message}") |
|
|
|
system_prompt = ( |
|
"Tu es un chatbot spécialisé dans la santé mentale, et plus particulièrement la schizophrénie. " |
|
"Tu réponds de façon fiable, claire et empathique, en t'appuyant uniquement sur des sources médicales et en français. " |
|
) |
|
|
|
enriched_context = "" |
|
|
|
if conversation_id in conversation_history: |
|
actual_questions = [] |
|
for msg in conversation_history[conversation_id]: |
|
if msg.startswith("Question : "): |
|
q_text = msg.replace("Question : ", "") |
|
|
|
is_meta = any(phrase in q_text.lower() for phrase in [ |
|
"ma première question", "ma précédente question", "ma dernière question", |
|
"ce que j'ai demandé", "j'ai dit quoi", "quelles questions", |
|
"c'était quoi ma", "quelle était ma", "mes questions" |
|
]) |
|
if not is_meta and q_text != user_message: |
|
actual_questions.append(q_text) |
|
|
|
if actual_questions: |
|
recent_questions = actual_questions[-5:] |
|
enriched_context += "Historique récent des questions:\n" |
|
for i, q in enumerate(recent_questions): |
|
enriched_context += f"- Question précédente {len(recent_questions)-i}: {q}\n" |
|
enriched_context += "\n" |
|
|
|
if context: |
|
enriched_context += "Contexte médical pertinent:\n" |
|
enriched_context += context |
|
enriched_context += "\n\n" |
|
|
|
if enriched_context: |
|
system_prompt += ( |
|
f"\n\n{enriched_context}\n\n" |
|
"Utilise ces informations pour répondre de manière plus précise et contextuelle. " |
|
"Ne pas inventer d'informations. Si tu ne sais pas, redirige vers un professionnel de santé." |
|
) |
|
else: |
|
system_prompt += ( |
|
"Tu dois répondre uniquement à partir de connaissances médicales factuelles. " |
|
"Si tu ne sais pas répondre, indique-le clairement et suggère de consulter un professionnel de santé." |
|
) |
|
|
|
messages = [{"role": "system", "content": system_prompt}] |
|
|
|
if conversation_id and len(conversation_history.get(conversation_id, [])) > 0: |
|
history = conversation_history[conversation_id] |
|
for i in range(0, min(20, len(history)-1), 2): |
|
if i+1 < len(history): |
|
if history[i].startswith("Question :"): |
|
user_text = history[i].replace("Question : ", "") |
|
messages.append({"role": "user", "content": user_text}) |
|
|
|
if history[i+1].startswith("Réponse :"): |
|
assistant_text = history[i+1].replace("Réponse : ", "") |
|
messages.append({"role": "assistant", "content": assistant_text}) |
|
|
|
messages.append({"role": "user", "content": user_message}) |
|
|
|
try: |
|
completion = hf_client.chat.completions.create( |
|
model="mistralai/Mistral-7B-Instruct-v0.3", |
|
messages=messages, |
|
max_tokens=400, |
|
temperature=0.7, |
|
timeout=15, |
|
) |
|
bot_response = completion.choices[0].message["content"].strip() |
|
except Exception: |
|
fallback = hf_client.text_generation( |
|
model="mistralai/Mistral-7B-Instruct-v0.3", |
|
prompt=f"<s>[INST] {system_prompt}\n\nQuestion: {user_message} [/INST]", |
|
max_new_tokens=512, |
|
temperature=0.7 |
|
) |
|
bot_response = fallback |
|
|
|
if conversation_id: |
|
conversation_history[conversation_id].append(f"Réponse : {bot_response}") |
|
|
|
if len(conversation_history[conversation_id]) > 50: |
|
conversation_history[conversation_id] = conversation_history[conversation_id][-50:] |
|
|
|
if conversation_id and current_user: |
|
db.messages.insert_one({ |
|
"conversation_id": conversation_id, |
|
"user_id": str(current_user["_id"]), |
|
"sender": "assistant", |
|
"text": bot_response, |
|
"timestamp": datetime.utcnow() |
|
}) |
|
response_tokens = int(len(bot_response.split()) * 1.3) |
|
total_tokens = current_tokens + message_tokens + response_tokens |
|
db.conversations.update_one( |
|
{"_id": ObjectId(conversation_id)}, |
|
{"$set": { |
|
"last_message": bot_response, |
|
"updated_at": datetime.utcnow(), |
|
"token_count": total_tokens |
|
}} |
|
) |
|
|
|
return {"response": bot_response} |
|
|
|
|
|
def simulate_token_count(text): |
|
""" |
|
Simule le comptage de tokens sans appeler d'API externe. |
|
""" |
|
if not text: |
|
return 0 |
|
|
|
text = text.replace('\n', ' \n ') |
|
|
|
spaces_and_punct = sum(1 for c in text if c.isspace() or c in ',.;:!?()[]{}"\'`-_=+<>/@#$%^&*|\\') |
|
|
|
digits = sum(1 for c in text if c.isdigit()) |
|
|
|
words = text.split() |
|
short_words = sum(1 for w in words if len(w) <= 2) |
|
|
|
|
|
code_blocks = len(re.findall(r'```[\s\S]*?```', text)) |
|
urls = len(re.findall(r'https?://\S+', text)) |
|
|
|
adjusted_length = len(text) - spaces_and_punct - digits - short_words |
|
|
|
token_count = ( |
|
adjusted_length / 4 + |
|
spaces_and_punct * 0.25 + |
|
digits * 0.5 + |
|
short_words * 0.5 + |
|
code_blocks * 5 + |
|
urls * 4 |
|
) |
|
|
|
return int(token_count * 1.1) + 1 |
|
@app.get("/data") |
|
async def get_data(): |
|
data = {"data": np.random.rand(100).tolist()} |
|
return JSONResponse(data) |
|
|
|
@app.get("/api/conversations") |
|
async def get_conversations(current_user: dict = Depends(get_current_user)): |
|
try: |
|
user_id = str(current_user["_id"]) |
|
conversations = list(db.conversations.find( |
|
{"user_id": user_id}, |
|
{"_id": 1, "title": 1, "date": 1, "time": 1, "last_message": 1, "created_at": 1} |
|
).sort("created_at", -1)) |
|
|
|
for conv in conversations: |
|
conv["_id"] = str(conv["_id"]) |
|
|
|
return {"conversations": conversations} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") |
|
|
|
@app.post("/api/conversations") |
|
async def create_conversation(request: Request, current_user: dict = Depends(get_current_user)): |
|
try: |
|
data = await request.json() |
|
user_id = str(current_user["_id"]) |
|
|
|
conversation = { |
|
"user_id": user_id, |
|
"title": data.get("title", "Nouvelle conversation"), |
|
"date": data.get("date"), |
|
"time": data.get("time"), |
|
"last_message": data.get("message", ""), |
|
"created_at": datetime.utcnow() |
|
} |
|
|
|
result = db.conversations.insert_one(conversation) |
|
|
|
return {"conversation_id": str(result.inserted_id)} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") |
|
|
|
@app.post("/api/conversations/{conversation_id}/messages") |
|
async def add_message(conversation_id: str, request: Request, current_user: dict = Depends(get_current_user)): |
|
try: |
|
data = await request.json() |
|
user_id = str(current_user["_id"]) |
|
|
|
print(f"Ajout message: conversation_id={conversation_id}, sender={data.get('sender')}, text={data.get('text')[:20]}...") |
|
|
|
conversation = db.conversations.find_one({ |
|
"_id": ObjectId(conversation_id), |
|
"user_id": user_id |
|
}) |
|
|
|
if not conversation: |
|
raise HTTPException(status_code=404, detail="Conversation non trouvée") |
|
|
|
message = { |
|
"conversation_id": conversation_id, |
|
"user_id": user_id, |
|
"sender": data.get("sender", "user"), |
|
"text": data.get("text", ""), |
|
"timestamp": datetime.utcnow() |
|
} |
|
|
|
db.messages.insert_one(message) |
|
|
|
db.conversations.update_one( |
|
{"_id": ObjectId(conversation_id)}, |
|
{"$set": {"last_message": data.get("text", ""), "updated_at": datetime.utcnow()}} |
|
) |
|
|
|
return {"success": True} |
|
except Exception as e: |
|
print(f"Erreur lors de l'ajout d'un message: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") |
|
|
|
@app.get("/api/conversations/{conversation_id}/messages") |
|
async def get_messages(conversation_id: str, current_user: dict = Depends(get_current_user)): |
|
try: |
|
user_id = str(current_user["_id"]) |
|
|
|
conversation = db.conversations.find_one({ |
|
"_id": ObjectId(conversation_id), |
|
"user_id": user_id |
|
}) |
|
|
|
if not conversation: |
|
raise HTTPException(status_code=404, detail="Conversation non trouvée") |
|
|
|
messages = list(db.messages.find( |
|
{"conversation_id": conversation_id} |
|
).sort("timestamp", 1)) |
|
|
|
for msg in messages: |
|
msg["_id"] = str(msg["_id"]) |
|
if "timestamp" in msg: |
|
msg["timestamp"] = msg["timestamp"].isoformat() |
|
|
|
return {"messages": messages} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") |
|
|
|
@app.delete("/api/conversations/{conversation_id}") |
|
async def delete_conversation(conversation_id: str, current_user: dict = Depends(get_current_user)): |
|
try: |
|
user_id = str(current_user["_id"]) |
|
|
|
result = db.conversations.delete_one({ |
|
"_id": ObjectId(conversation_id), |
|
"user_id": user_id |
|
}) |
|
|
|
if result.deleted_count == 0: |
|
raise HTTPException(status_code=404, detail="Conversation non trouvée") |
|
|
|
db.messages.delete_many({"conversation_id": conversation_id}) |
|
|
|
return {"success": True} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") |
|
|
|
|
|
app.mount("/", StaticFiles(directory="static", html=True), name="static") |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
|
|
print(args) |
|
uvicorn.run( |
|
"app:app", |
|
host=args.host, |
|
port=args.port, |
|
reload=args.reload, |
|
|
|
ssl_certfile=args.ssl_certfile, |
|
ssl_keyfile=args.ssl_keyfile, |
|
) |
|
|
|
|