|
from fastapi.staticfiles import StaticFiles |
|
import requests, re, warnings |
|
from dotenv import load_dotenv |
|
from fastapi import FastAPI, Request, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import FileResponse, StreamingResponse |
|
from bs4 import BeautifulSoup |
|
|
|
from huggingface_hub import configure_http_backend |
|
|
|
from schemas import * |
|
from classes import * |
|
|
|
def backend_factory() -> requests.Session: |
|
session = requests.Session() |
|
session.verify = False |
|
return session |
|
|
|
configure_http_backend(backend_factory=backend_factory) |
|
warnings.filterwarnings("ignore") |
|
load_dotenv() |
|
|
|
meetings_mapping = { |
|
"SA": [ |
|
"TSG_SA", |
|
"WG1_Serv", |
|
"WG2_Arch", |
|
"WG3_Security", |
|
"WG4_CODEC", |
|
"WG5_TM", |
|
"WG6_MissionCritical" |
|
], |
|
"CT": [ |
|
"TSG_CT", |
|
"WG1_mm-cc-sm_ex-CN1", |
|
"WG2_capability_ex-T2", |
|
"WG3_interworking_ex-CN3", |
|
"WG4_protocollars_ex-CN4", |
|
"WG5_osa_ex-CN5", |
|
"WG6_Smartcard_Ex-T3" |
|
], |
|
"RAN": [ |
|
"TSG_RAN", |
|
"WG1_RL1", |
|
"WG2_RL2", |
|
"WG3_Iu", |
|
"WG4_Radio", |
|
"WG5_Test_ex-T1", |
|
"WG6_legacyRAN" |
|
] |
|
} |
|
|
|
tdoc_indexer = TDocIndexer() |
|
spec_3gpp_indexer = Spec3GPPIndexer() |
|
spec_etsi_indexer = SpecETSIIndexer() |
|
|
|
app = FastAPI() |
|
app.add_middleware(CORSMiddleware, allow_credentials=True, allow_headers=["*"], allow_origins=["*"]) |
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
|
@app.get('/') |
|
def main(): |
|
return FileResponse("index.html") |
|
def get_folder_name(working_group: str): |
|
if working_group.endswith("P"): |
|
if working_group.startswith("S"): |
|
return ("SA", 0) |
|
if working_group.startswith("C"): |
|
return ("CT", 0) |
|
if working_group.startswith("R"): |
|
return ("RAN", 0) |
|
m = re.match(r"([A-Z]+)(\d+)", working_group) |
|
if m: |
|
code, num = m.groups() |
|
return (code, int(num)) |
|
else: |
|
raise ValueError("Unattended format") |
|
|
|
@app.get("/get_meetings/{working_group}") |
|
def get_meetings(working_group: str): |
|
category, wg_number = get_folder_name(working_group) |
|
folder = meetings_mapping[category][wg_number] |
|
url = f"https://www.3gpp.org/ftp/{meetings_mapping[category][0]}/{folder}" |
|
response = requests.get(url, verify=False) |
|
responseHTML = response.text |
|
soup = BeautifulSoup(responseHTML, "html.parser") |
|
return {"url": url, "meetings": [item.get_text() for item in soup.select("tr td a") if item.get_text().startswith("TSG") or item.get_text().startswith("CT")]} |
|
|
|
@app.post("/index_tdocs/working_group") |
|
def index_tdocs_wg_progress(req: IndexTDoc): |
|
if not req.wg: |
|
raise HTTPException(status_code=400, detail="Working Group not defined !") |
|
category, wg_number = get_folder_name(req.wg) |
|
folder = meetings_mapping[category][wg_number] |
|
url = f"https://www.3gpp.org/ftp/{meetings_mapping[category][0]}" |
|
def generate_events(): |
|
yield f"event: info\ndata: {req.wg}\n\n" |
|
for content in tdoc_indexer.process_workgroup(folder, url): |
|
yield content |
|
tdoc_indexer.save_indexer() |
|
yield "event: end\ndata: Indexation ended successfully !\n\n" |
|
return StreamingResponse(generate_events(), media_type="text/event-stream") |
|
|
|
@app.post("/index_tdocs/meeting") |
|
def index_tdocs_meeting_progress(req: IndexTDoc): |
|
if not req.wg: |
|
raise HTTPException(status_code=400, detail="Working Group not defined !") |
|
if not req.meetings: |
|
raise HTTPException(status_code=400, detail="Meetings not defined !") |
|
|
|
category, wg_number = get_folder_name(req.wg) |
|
folder = meetings_mapping[category][wg_number] |
|
url = f"https://www.3gpp.org/ftp/{meetings_mapping[category][0]}/{folder}" |
|
def generate_events(): |
|
yield f"event: get-maximum\ndata: {len(req.meetings)}\n\n" |
|
for i, meet in enumerate(req.meetings): |
|
yield f"event: info\ndata: {req.wg}-{meet}\n\n" |
|
tdoc_indexer.process_meeting(meet, url) |
|
yield f"event: progress\ndata: {i+1}\n\n" |
|
tdoc_indexer.save_indexer() |
|
yield "event: end\ndata: Indexation ended successfully !\n\n" |
|
return StreamingResponse(generate_events(), media_type="text/event-stream") |
|
|
|
|
|
@app.post("/index_tdocs/all") |
|
def index_all_tdocs_progress(): |
|
def generate_events(): |
|
for content in tdoc_indexer.index_all_tdocs(): |
|
yield content |
|
tdoc_indexer.save_indexer() |
|
yield "event: end\ndata: Indexation ended successfully !\n\n" |
|
return StreamingResponse(generate_events(), media_type="text/event-stream") |
|
|
|
|
|
@app.post("/index_specs/3gpp") |
|
def index_3gpp_specs_progress(): |
|
def generate_events(): |
|
for content in spec_3gpp_indexer.run(): |
|
yield content |
|
yield "event: info\ndata: Saving index ...\n\n" |
|
yield "event: get-maximum\ndata: 1\n\n" |
|
yield "event: progress\ndata: 1\n\n" |
|
spec_3gpp_indexer.save() |
|
yield "event: info\ndata: Creating BM25 models ...\n\n" |
|
yield "event: get-maximum\ndata: 1\n\n" |
|
yield "event: progress\ndata: 1\n\n" |
|
spec_3gpp_indexer.create_bm25_index() |
|
yield "event: end\ndata: Indexation ended successfully !\n\n" |
|
return StreamingResponse(generate_events(), media_type="text/event-stream") |
|
|
|
@app.post("/index_specs/etsi") |
|
def index_etsi_specs_progress(): |
|
def generate_events(): |
|
for content in spec_etsi_indexer.run(): |
|
yield content |
|
yield "event: info\ndata: Saving index ...\n\n" |
|
yield "event: get-maximum\ndata: 1\n\n" |
|
yield "event: progress\ndata: 1\n\n" |
|
spec_etsi_indexer.save() |
|
yield "event: info\ndata: Creating BM25 models ...\n\n" |
|
yield "event: get-maximum\ndata: 1\n\n" |
|
yield "event: progress\ndata: 1\n\n" |
|
spec_etsi_indexer.create_bm25_index() |
|
yield "event: end\ndata: Indexation ended successfully !\n\n" |
|
return StreamingResponse(generate_events(), media_type="text/event-stream") |