Spaces:
Sleeping
Sleeping
from bs4 import BeautifulSoup | |
import warnings | |
import io | |
import zipfile | |
from lxml import etree | |
import os | |
from dotenv import load_dotenv | |
import requests | |
import subprocess | |
import string | |
from nltk.tokenize import word_tokenize | |
from nltk.corpus import stopwords | |
from nltk.stem import WordNetLemmatizer | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import json | |
import traceback | |
from fastapi import FastAPI, BackgroundTasks, HTTPException | |
from fastapi.staticfiles import StaticFiles | |
from schemas import * | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import FileResponse, StreamingResponse | |
from litellm.router import Router | |
from aiolimiter import AsyncLimiter | |
import pandas as pd | |
import asyncio | |
import logging | |
import re | |
import nltk | |
load_dotenv() | |
logging.basicConfig( | |
level=logging.INFO, | |
format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]: %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
nltk.download('stopwords') | |
nltk.download('punkt_tab') | |
nltk.download('wordnet') | |
warnings.filterwarnings("ignore") | |
app = FastAPI(title="Requirements Extractor") | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
app.add_middleware(CORSMiddleware, allow_credentials=True, allow_headers=[ | |
"*"], allow_methods=["*"], allow_origins=["*"]) | |
llm_router = Router(model_list=[ | |
{ | |
"model_name": "gemini-v1", | |
"litellm_params": | |
{ | |
"model": "gemini/gemini-2.0-flash", | |
"api_key": os.environ.get("GEMINI"), | |
"max_retries": 10, | |
"rpm": 15, | |
"allowed_fails": 1, | |
"cooldown": 30, | |
} | |
}, | |
{ | |
"model_name": "gemini-v2", | |
"litellm_params": | |
{ | |
"model": "gemini/gemini-2.5-flash", | |
"api_key": os.environ.get("GEMINI"), | |
"max_retries": 10, | |
"rpm": 10, | |
"allowed_fails": 1, | |
"cooldown": 30, | |
} | |
}], fallbacks=[{"gemini-v2": ["gemini-v1"]}], num_retries=10, retry_after=30) | |
limiter_mapping = { | |
model["model_name"]: AsyncLimiter(model["litellm_params"]["rpm"], 60) | |
for model in llm_router.model_list | |
} | |
lemmatizer = WordNetLemmatizer() | |
NSMAP = { | |
'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main', | |
'v': 'urn:schemas-microsoft-com:vml' | |
} | |
def lemma(text: str): | |
stop_words = set(stopwords.words('english')) | |
txt = text.translate(str.maketrans('', '', string.punctuation)).strip() | |
tokens = [token for token in word_tokenize( | |
txt.lower()) if token not in stop_words] | |
return [lemmatizer.lemmatize(token) for token in tokens] | |
def get_docx_archive(url: str) -> zipfile.ZipFile: | |
"""Récupère le docx depuis l'URL et le retourne comme objet ZipFile""" | |
if not url.endswith("zip"): | |
raise ValueError("URL doit pointer vers un fichier ZIP") | |
doc_id = os.path.splitext(os.path.basename(url))[0] | |
resp = requests.get(url, verify=False, headers={ | |
"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
}) | |
resp.raise_for_status() | |
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: | |
for file_name in zf.namelist(): | |
if file_name.endswith(".docx"): | |
docx_bytes = zf.read(file_name) | |
return zipfile.ZipFile(io.BytesIO(docx_bytes)) | |
elif file_name.endswith(".doc"): | |
input_path = f"/tmp/{doc_id}.doc" | |
output_path = f"/tmp/{doc_id}.docx" | |
docx_bytes = zf.read(file_name) | |
with open(input_path, "wb") as f: | |
f.write(docx_bytes) | |
subprocess.run([ | |
"libreoffice", | |
"--headless", | |
"--convert-to", "docx", | |
"--outdir", "/tmp", | |
input_path | |
], check=True) | |
with open(output_path, "rb") as f: | |
docx_bytes = f.read() | |
os.remove(input_path) | |
os.remove(output_path) | |
return zipfile.ZipFile(io.BytesIO(docx_bytes)) | |
raise ValueError("Aucun fichier docx/doc trouvé dans l'archive") | |
def parse_document_xml(docx_zip: zipfile.ZipFile) -> etree._ElementTree: | |
"""Parse le document.xml principal""" | |
xml_bytes = docx_zip.read('word/document.xml') | |
parser = etree.XMLParser(remove_blank_text=True) | |
return etree.fromstring(xml_bytes, parser=parser) | |
def clean_document_xml(root: etree._Element) -> None: | |
"""Nettoie le XML en modifiant l'arbre directement""" | |
# Suppression des balises <w:del> et leur contenu | |
for del_elem in root.xpath('//w:del', namespaces=NSMAP): | |
parent = del_elem.getparent() | |
if parent is not None: | |
parent.remove(del_elem) | |
# Désencapsulation des balises <w:ins> | |
for ins_elem in root.xpath('//w:ins', namespaces=NSMAP): | |
parent = ins_elem.getparent() | |
index = parent.index(ins_elem) | |
for child in ins_elem.iterchildren(): | |
parent.insert(index, child) | |
index += 1 | |
parent.remove(ins_elem) | |
# Nettoyage des commentaires | |
for tag in ['w:commentRangeStart', 'w:commentRangeEnd', 'w:commentReference']: | |
for elem in root.xpath(f'//{tag}', namespaces=NSMAP): | |
parent = elem.getparent() | |
if parent is not None: | |
parent.remove(elem) | |
def create_modified_docx(original_zip: zipfile.ZipFile, modified_root: etree._Element) -> bytes: | |
"""Crée un nouveau docx avec le XML modifié""" | |
output = io.BytesIO() | |
with zipfile.ZipFile(output, 'w', compression=zipfile.ZIP_DEFLATED) as new_zip: | |
# Copier tous les fichiers non modifiés | |
for file in original_zip.infolist(): | |
if file.filename != 'word/document.xml': | |
new_zip.writestr(file, original_zip.read(file.filename)) | |
# Ajouter le document.xml modifié | |
xml_str = etree.tostring( | |
modified_root, | |
xml_declaration=True, | |
encoding='UTF-8', | |
pretty_print=True | |
) | |
new_zip.writestr('word/document.xml', xml_str) | |
output.seek(0) | |
return output.getvalue() | |
def docx_to_txt(doc_id: str, url: str): | |
docx_zip = get_docx_archive(url) | |
root = parse_document_xml(docx_zip) | |
clean_document_xml(root) | |
modified_bytes = create_modified_docx(docx_zip, root) | |
input_path = f"/tmp/{doc_id}_cleaned.docx" | |
output_path = f"/tmp/{doc_id}_cleaned.txt" | |
with open(input_path, "wb") as f: | |
f.write(modified_bytes) | |
subprocess.run([ | |
"libreoffice", | |
"--headless", | |
"--convert-to", "txt", | |
"--outdir", "/tmp", | |
input_path | |
], check=True) | |
with open(output_path, "r", encoding="utf-8") as f: | |
txt_data = [line.strip() for line in f if line.strip()] | |
os.remove(input_path) | |
os.remove(output_path) | |
return txt_data | |
def render_page(): | |
return FileResponse("index.html") | |
def get_meetings(req: MeetingsRequest): | |
working_group = req.working_group | |
tsg = re.sub(r"\d+", "", working_group) | |
wg_number = re.search(r"\d", working_group).group(0) | |
logging.debug(tsg, wg_number) | |
url = "https://www.3gpp.org/ftp/tsg_" + tsg | |
logging.debug(url) | |
resp = requests.get(url, verify=False) | |
soup = BeautifulSoup(resp.text, "html.parser") | |
meeting_folders = [] | |
all_meetings = [] | |
wg_folders = [item.get_text() for item in soup.select("tr td a")] | |
selected_folder = None | |
for folder in wg_folders: | |
if "wg" + str(wg_number) in folder.lower(): | |
selected_folder = folder | |
break | |
url += "/" + selected_folder | |
logging.debug(url) | |
if selected_folder: | |
resp = requests.get(url, verify=False) | |
soup = BeautifulSoup(resp.text, "html.parser") | |
meeting_folders = [item.get_text() for item in soup.select("tr td a") if item.get_text( | |
).startswith("TSG") or (item.get_text().startswith("CT") and "-" in item.get_text())] | |
all_meetings = [working_group + "#" + meeting.split("_", 1)[1].replace("_", " ").replace( | |
"-", " ") if meeting.startswith('TSG') else meeting.replace("-", "#") for meeting in meeting_folders] | |
return MeetingsResponse(meetings=dict(zip(all_meetings, meeting_folders))) | |
def get_change_request_dataframe(req: DataRequest): | |
working_group = req.working_group | |
tsg = re.sub(r"\d+", "", working_group) | |
wg_number = re.search(r"\d", working_group).group(0) | |
url = "https://www.3gpp.org/ftp/tsg_" + tsg | |
logging.info("Fetching TDocs dataframe") | |
resp = requests.get(url, verify=False) | |
soup = BeautifulSoup(resp.text, "html.parser") | |
wg_folders = [item.get_text() for item in soup.select("tr td a")] | |
selected_folder = None | |
for folder in wg_folders: | |
if str(wg_number) in folder: | |
selected_folder = folder | |
break | |
url += "/" + selected_folder + "/" + req.meeting + "/docs" | |
resp = requests.get(url, verify=False) | |
soup = BeautifulSoup(resp.text, "html.parser") | |
files = [item.get_text() for item in soup.select("tr td a") | |
if item.get_text().endswith(".xlsx")] | |
def gen_url(tdoc: str): | |
return f"{url}/{tdoc}.zip" | |
df = pd.read_excel(str(url + "/" + files[0]).replace("#", "%23")) | |
filtered_df = df[(((df["Type"] == "CR") & ((df["CR category"] == "B") | (df["CR category"] == "C"))) | (df["Type"] == "pCR")) & ~( | |
df["Uploaded"].isna())][["TDoc", "Title", "CR category", "Source", "Type", "Agenda item", "Agenda item description", "TDoc Status"]] | |
filtered_df["URL"] = filtered_df["TDoc"].apply(gen_url) | |
df = filtered_df.fillna("") | |
return DataResponse(data=df[["TDoc", "Title", "Type", "TDoc Status", "Agenda item description", "URL"]].to_dict(orient="records")) | |
def download_tdocs(req: DownloadRequest): | |
documents = req.documents | |
def process_document(doc: str): | |
doc_id = doc | |
url = requests.post( | |
'https://organizedprogrammers-3gppdocfinder.hf.space/find', | |
headers={"Content-Type": "application/json"}, | |
data=json.dumps({"doc_id": doc_id}), | |
verify=False | |
) | |
print(url.status_code) | |
url = url.json()['url'] | |
print(url) | |
try: | |
txt = "\n".join(docx_to_txt(doc_id, url)) | |
except Exception as e: | |
txt = f"Document {doc_id} text extraction failed: {e}" | |
return doc_id, txt.encode("utf-8") | |
def process_batch(batch): | |
results = {} | |
for doc in batch: | |
try: | |
doc_id, file_bytes = process_document(doc) | |
results[doc_id] = file_bytes | |
except Exception as e: | |
traceback.print_exception(e) | |
results[doc] = b"Erreur" | |
return results | |
documents_bytes = process_batch(documents) | |
zip_buffer = io.BytesIO() | |
with zipfile.ZipFile(zip_buffer, mode='w', compression=zipfile.ZIP_DEFLATED) as zip_file: | |
for doc_id, txt_data in documents_bytes.items(): | |
zip_file.writestr(f'{doc_id}.txt', txt_data) | |
zip_buffer.seek(0) | |
return StreamingResponse( | |
zip_buffer, | |
media_type="application/zip" | |
) | |
async def gen_reqs(req: RequirementsRequest, background_tasks: BackgroundTasks): | |
documents = req.documents | |
n_docs = len(documents) | |
logging.info("Generating requirements for documents: {}".format([doc.document for doc in documents])) | |
def prompt(doc_id, full): | |
return f"Here's the document whose ID is {doc_id} : {full}\n\nExtract all requirements and group them by context, returning a list of objects where each object includes a document ID, a concise description of the context where the requirements apply (not a chapter title or copied text), and a list of associated requirements; always return the result as a list, even if only one context is found. Remove the errors" | |
async def process_document(doc): | |
doc_id = doc.document | |
url = doc.url | |
try: | |
full = "\n".join(docx_to_txt(doc_id, url)) | |
except Exception as e: | |
traceback.print_exception(e) | |
return RequirementsResponse(requirements=[DocRequirements(document=doc_id, context="Error LLM", requirements=[])]).requirements | |
try: | |
model_used = "gemini-v2" # À adapter si fallback activé | |
async with limiter_mapping[model_used]: | |
resp_ai = await llm_router.acompletion( | |
model=model_used, | |
messages=[ | |
{"role": "user", "content": prompt(doc_id, full)}], | |
response_format=RequirementsResponse | |
) | |
return RequirementsResponse.model_validate_json(resp_ai.choices[0].message.content).requirements | |
except Exception as e: | |
if "rate limit" in str(e).lower(): | |
try: | |
model_used = "gemini-v2" # À adapter si fallback activé | |
async with limiter_mapping[model_used]: | |
resp_ai = await llm_router.acompletion( | |
model=model_used, | |
messages=[ | |
{"role": "user", "content": prompt(doc_id, full)}], | |
response_format=RequirementsResponse | |
) | |
return RequirementsResponse.model_validate_json(resp_ai.choices[0].message.content).requirements | |
except Exception as fallback_e: | |
traceback.print_exception(fallback_e) | |
return RequirementsResponse(requirements=[DocRequirements(document=doc_id, context="Error LLM", requirements=[])]).requirements | |
else: | |
traceback.print_exception(e) | |
return RequirementsResponse(requirements=[DocRequirements(document=doc_id, context="Error LLM", requirements=[])]).requirements | |
async def process_batch(batch): | |
results = await asyncio.gather(*(process_document(doc) for doc in batch)) | |
return [item for sublist in results for item in sublist] | |
all_requirements = [] | |
if n_docs <= 30: | |
batch_results = await process_batch(documents) | |
all_requirements.extend(batch_results) | |
else: | |
batch_size = 30 | |
batches = [documents[i:i + batch_size] | |
for i in range(0, n_docs, batch_size)] | |
for i, batch in enumerate(batches): | |
batch_results = await process_batch(batch) | |
all_requirements.extend(batch_results) | |
if i < len(batches) - 1: | |
background_tasks.add_task(asyncio.sleep, 60) | |
return RequirementsResponse(requirements=all_requirements) | |
def find_requirements_from_problem_description(req: ReqSearchRequest): | |
requirements = req.requirements | |
query = req.query | |
requirements_text = "\n".join( | |
[f"[Selection ID: {r.req_id} | Document: {r.document} | Context: {r.context} | Requirement: {r.requirement}]" for r in requirements]) | |
print("Called the LLM") | |
resp_ai = llm_router.completion( | |
model="gemini-v2", | |
messages=[{"role": "user", "content": f"Given all the requirements : \n {requirements_text} \n and the problem description \"{query}\", return a list of 'Selection ID' for the most relevant corresponding requirements that reference or best cover the problem. If none of the requirements covers the problem, simply return an empty list"}], | |
response_format=ReqSearchLLMResponse | |
) | |
print("Answered") | |
print(resp_ai.choices[0].message.content) | |
out_llm = ReqSearchLLMResponse.model_validate_json( | |
resp_ai.choices[0].message.content).selected | |
if max(out_llm) > len(requirements) - 1: | |
raise HTTPException( | |
status_code=500, detail="LLM error : Generated a wrong index, please try again.") | |
return ReqSearchResponse(requirements=[requirements[i] for i in out_llm]) | |