|
import shutil |
|
import bm25s |
|
from bm25s.hf import BM25HF |
|
import threading, re, time, concurrent.futures, requests, os, hashlib, traceback, io, zipfile, subprocess, tempfile, json, fitz |
|
import pandas as pd |
|
import numpy as np |
|
|
|
from bs4 import BeautifulSoup |
|
from datasets import load_dataset, Dataset |
|
from datasets.data_files import EmptyDatasetError |
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
class TDocIndexer: |
|
def __init__(self, max_workers=33): |
|
self.indexer_length = 0 |
|
self.dataset = "OrganizedProgrammers/3GPPTDocLocation" |
|
|
|
self.indexer = self.load_indexer() |
|
self.main_ftp_url = "https://3gpp.org/ftp" |
|
self.valid_doc_pattern = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE) |
|
self.max_workers = max_workers |
|
|
|
self.print_lock = threading.Lock() |
|
self.indexer_lock = threading.Lock() |
|
|
|
self.total_indexed = 0 |
|
self.processed_count = 0 |
|
self.total_count = 0 |
|
|
|
def load_indexer(self): |
|
self.indexer_length = 0 |
|
all_docs = {} |
|
tdoc_locations = load_dataset(self.dataset) |
|
tdoc_locations = tdoc_locations["train"].to_list() |
|
for doc in tdoc_locations: |
|
self.indexer_length += 1 |
|
all_docs[doc["doc_id"]] = doc["url"] |
|
|
|
return all_docs |
|
|
|
def save_indexer(self): |
|
"""Save the updated index""" |
|
data = [] |
|
for doc_id, url in self.indexer.items(): |
|
data.append({"doc_id": doc_id, "url": url}) |
|
|
|
dataset = Dataset.from_list(data) |
|
dataset.push_to_hub(self.dataset, token=os.environ["HF"]) |
|
self.indexer = self.load_indexer() |
|
|
|
def get_docs_from_url(self, url): |
|
try: |
|
response = requests.get(url, verify=False, timeout=10) |
|
soup = BeautifulSoup(response.text, "html.parser") |
|
return [item.get_text() for item in soup.select("tr td a")] |
|
except Exception as e: |
|
with self.print_lock: |
|
print(f"Erreur lors de l'accès à {url}: {e}") |
|
return [] |
|
|
|
def is_valid_document_pattern(self, filename): |
|
return bool(self.valid_doc_pattern.match(filename)) |
|
|
|
def is_zip_file(self, filename): |
|
return filename.lower().endswith('.zip') |
|
|
|
def extract_doc_id(self, filename): |
|
if self.is_valid_document_pattern(filename): |
|
match = self.valid_doc_pattern.match(filename) |
|
if match: |
|
|
|
full_id = filename.split('.')[0] |
|
return full_id.split('_')[0] |
|
return None |
|
|
|
def process_zip_files(self, files_list, base_url, workshop=False): |
|
"""Traiter une liste de fichiers pour trouver et indexer les ZIP valides""" |
|
indexed_count = 0 |
|
|
|
for file in files_list: |
|
if file in ['./', '../', 'ZIP/', 'zip/']: |
|
continue |
|
|
|
|
|
if self.is_zip_file(file) and (self.is_valid_document_pattern(file) or workshop): |
|
file_url = f"{base_url}/{file}" |
|
|
|
|
|
doc_id = self.extract_doc_id(file) |
|
if doc_id is None: |
|
doc_id = file.split('.')[0] |
|
if doc_id: |
|
|
|
with self.indexer_lock: |
|
if doc_id in self.indexer and self.indexer[doc_id] == file_url: |
|
continue |
|
|
|
|
|
self.indexer[doc_id] = file_url |
|
indexed_count += 1 |
|
self.total_indexed += 1 |
|
|
|
return indexed_count |
|
|
|
def process_meeting(self, meeting, wg_url, workshop=False): |
|
"""Traiter une réunion individuelle avec multithreading""" |
|
try: |
|
if meeting in ['./', '../']: |
|
return 0 |
|
|
|
meeting_url = f"{wg_url}/{meeting}" |
|
|
|
with self.print_lock: |
|
print(f"Vérification du meeting: {meeting}") |
|
|
|
|
|
meeting_contents = self.get_docs_from_url(meeting_url) |
|
|
|
key = None |
|
if "docs" in [x.lower() for x in meeting_contents]: |
|
key = "docs" |
|
elif "tdocs" in [x.lower() for x in meeting_contents]: |
|
key = "tdocs" |
|
elif "tdoc" in [x.lower() for x in meeting_contents]: |
|
key = "tdoc" |
|
|
|
if key is not None: |
|
docs_url = f"{meeting_url}/{key}" |
|
|
|
with self.print_lock: |
|
print(f"Vérification des documents présent dans {docs_url}") |
|
|
|
|
|
docs_files = self.get_docs_from_url(docs_url) |
|
|
|
|
|
docs_indexed_count = self.process_zip_files(docs_files, docs_url, workshop) |
|
|
|
if docs_indexed_count > 0: |
|
with self.print_lock: |
|
print(f"{docs_indexed_count} fichiers trouvés") |
|
|
|
|
|
if "zip" in [x.lower() for x in docs_files]: |
|
zip_url = f"{docs_url}/zip" |
|
|
|
with self.print_lock: |
|
print(f"Vérification du dossier ./zip: {zip_url}") |
|
|
|
|
|
zip_files = self.get_docs_from_url(zip_url) |
|
|
|
|
|
zip_indexed_count = self.process_zip_files(zip_files, zip_url, workshop) |
|
|
|
if zip_indexed_count > 0: |
|
with self.print_lock: |
|
print(f"{zip_indexed_count} fichiers trouvés") |
|
|
|
|
|
with self.indexer_lock: |
|
self.processed_count += 1 |
|
|
|
|
|
with self.print_lock: |
|
progress = (self.processed_count / self.total_count) * 100 if self.total_count > 0 else 0 |
|
print(f"\rProgression: {self.processed_count}/{self.total_count} réunions traitées ({progress:.1f}%)") |
|
|
|
return 1 |
|
|
|
except Exception as e: |
|
with self.print_lock: |
|
print(f"\nErreur lors du traitement de la réunion {meeting}: {str(e)}") |
|
return 0 |
|
|
|
def process_workgroup(self, wg, main_url): |
|
"""Traiter un groupe de travail avec multithreading pour ses réunions""" |
|
if wg in ['./', '../']: |
|
return |
|
|
|
wg_url = f"{main_url}/{wg}" |
|
|
|
with self.print_lock: |
|
print(f"Vérification du working group: {wg}") |
|
|
|
|
|
meeting_folders = self.get_docs_from_url(wg_url) |
|
|
|
|
|
self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
|
futures = [executor.submit(self.process_meeting, meeting, wg_url) |
|
for meeting in meeting_folders if meeting not in ['./', '../']] |
|
|
|
total = len(futures) |
|
done_count = 0 |
|
yield f"event: get-maximum\ndata: {total}\n\n" |
|
|
|
for future in concurrent.futures.as_completed(futures): |
|
done_count += 1 |
|
yield f"event: progress\ndata: {done_count}\n\n" |
|
|
|
def index_all_tdocs(self): |
|
"""Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading""" |
|
print("Démarrage de l'indexation des TDocs 3GPP complète") |
|
|
|
start_time = time.time() |
|
docs_count_before = self.indexer_length |
|
|
|
|
|
main_groups = ["tsg_sa", "tsg_ct", "tsg_ran"] |
|
|
|
for main_tsg in main_groups: |
|
print(f"Indexation de {main_tsg.upper()}...") |
|
|
|
main_url = f"{self.main_ftp_url}/{main_tsg}" |
|
|
|
|
|
workgroups = self.get_docs_from_url(main_url) |
|
|
|
|
|
|
|
for wg in workgroups: |
|
yield f"event: info\ndata: {main_tsg}-{wg}\n\n" |
|
for content in self.process_workgroup(wg, main_url): |
|
yield content |
|
|
|
docs_count_after = len(self.indexer) |
|
new_docs_count = abs(docs_count_after - docs_count_before) |
|
|
|
print(f"Indexation terminée en {time.time() - start_time:.2f} secondes") |
|
print(f"Nouveaux documents ZIP indexés: {new_docs_count}") |
|
print(f"Total des documents dans l'index: {docs_count_after}") |
|
|
|
return self.indexer |
|
|
|
def index_all_workshops(self): |
|
print("Démarrage de l'indexation des workshops ZIP 3GPP...") |
|
start_time = time.time() |
|
docs_count_before = len(self.indexer) |
|
|
|
print("\nIndexation du dossier 'workshop'") |
|
main_url = f"{self.main_ftp_url}/workshop" |
|
|
|
|
|
meeting_folders = self.get_docs_from_url(main_url) |
|
|
|
|
|
self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
|
futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True) |
|
for meeting in meeting_folders if meeting not in ['./', '../']] |
|
total = len(futures) |
|
done_count = 0 |
|
|
|
yield f"event: get-maximum\ndata: {total}\n\n" |
|
|
|
for future in concurrent.futures.as_completed(futures): |
|
done_count += 1 |
|
yield f"event: progress\ndata: {done_count}\n\n" |
|
|
|
docs_count_after = len(self.indexer) |
|
new_docs_count = docs_count_after - docs_count_before |
|
|
|
print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes") |
|
print(f"Nouveaux documents ZIP indexés: {new_docs_count}") |
|
print(f"Total des documents dans l'index: {docs_count_after}") |
|
|
|
return self.indexer |
|
|
|
class Spec3GPPIndexer: |
|
def __init__(self, max_workers=16): |
|
self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list() |
|
self.documents_by_spec_num = self._make_doc_index(self.spec_contents) |
|
self.indexed_specifications = {} |
|
self.specifications_passed = set() |
|
self.processed_count = 0 |
|
self.total_count = 0 |
|
|
|
self.DICT_LOCK = threading.Lock() |
|
self.DOCUMENT_LOCK = threading.Lock() |
|
self.STOP_EVENT = threading.Event() |
|
self.max_workers = max_workers |
|
self.LIBREOFFICE_SEMAPHORE = threading.Semaphore(self.max_workers) |
|
|
|
def _make_doc_index(self, specs): |
|
doc_index = {} |
|
for section in specs: |
|
if section["doc_id"] not in doc_index: |
|
doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} |
|
else: |
|
doc_index[section["doc_id"]]["content"][section["section"]] = section["content"] |
|
return doc_index |
|
|
|
@staticmethod |
|
def version_to_code(version_str): |
|
chars = "0123456789abcdefghijklmnopqrstuvwxyz" |
|
parts = version_str.split('.') |
|
if len(parts) != 3: |
|
return None |
|
try: |
|
x, y, z = [int(p) for p in parts] |
|
except ValueError: |
|
return None |
|
if x < 36 and y < 36 and z < 36: |
|
return f"{chars[x]}{chars[y]}{chars[z]}" |
|
else: |
|
return f"{str(x).zfill(2)}{str(y).zfill(2)}{str(z).zfill(2)}" |
|
|
|
@staticmethod |
|
def hasher(specification, version_code): |
|
return hashlib.md5(f"{specification}{version_code}".encode()).hexdigest() |
|
|
|
@staticmethod |
|
def get_scope(content): |
|
for title, text in content.items(): |
|
if title.lower().endswith("scope"): |
|
return text |
|
return "" |
|
|
|
def get_text(self, specification, version_code): |
|
if self.STOP_EVENT.is_set(): |
|
return [] |
|
|
|
doc_id = specification |
|
series = doc_id.split(".")[0] |
|
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" |
|
|
|
try: |
|
response = requests.get(url, verify=False) |
|
if response.status_code != 200: |
|
return [] |
|
|
|
zip_bytes = io.BytesIO(response.content) |
|
with zipfile.ZipFile(zip_bytes) as zip_file: |
|
|
|
docx_files = [f for f in zip_file.namelist() if f.lower().endswith(('.doc', '.docx'))] |
|
if not docx_files: |
|
return [] |
|
|
|
full_text = [] |
|
|
|
for doc_file in docx_files: |
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
extracted_path = os.path.join(tmpdir, os.path.basename(doc_file)) |
|
with open(extracted_path, 'wb') as f: |
|
f.write(zip_file.read(doc_file)) |
|
|
|
|
|
profile_dir = tempfile.mkdtemp(prefix="libreoffice_profile_") |
|
|
|
try: |
|
with self.LIBREOFFICE_SEMAPHORE: |
|
cmd = [ |
|
'soffice', |
|
'--headless', |
|
f'-env:UserInstallation=file://{profile_dir}', |
|
'--convert-to', 'txt:Text', |
|
'--outdir', tmpdir, |
|
extracted_path |
|
] |
|
subprocess.run(cmd, check=True, timeout=60*5, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
|
txt_file = os.path.splitext(extracted_path)[0] + '.txt' |
|
if os.path.exists(txt_file): |
|
with open(txt_file, 'r', encoding='utf-8', errors='ignore') as ftxt: |
|
full_text.extend(ftxt.readlines()) |
|
finally: |
|
shutil.rmtree(profile_dir, ignore_errors=True) |
|
|
|
return full_text |
|
|
|
except Exception as e: |
|
print(f"Error getting text for {specification} v{version_code}: {e}") |
|
return [] |
|
|
|
def get_spec_content(self, specification, version_code): |
|
if self.STOP_EVENT.is_set(): |
|
return {} |
|
|
|
text = self.get_text(specification, version_code) |
|
if not text: |
|
return {} |
|
|
|
chapters = [] |
|
chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+[^\.]$") |
|
for i, line in enumerate(text): |
|
if chapter_regex.fullmatch(line): |
|
chapters.append((i, line)) |
|
|
|
document = {} |
|
for i in range(len(chapters)): |
|
start_index, chapter_title = chapters[i] |
|
end_index = chapters[i+1][0] if i+1 < len(chapters) else len(text) |
|
content_lines = text[start_index + 1:end_index] |
|
document[chapter_title.replace("\t", " ")] = "\n".join(content_lines) |
|
|
|
return document |
|
|
|
def fetch_spec_table(self): |
|
response = requests.get( |
|
'https://www.3gpp.org/dynareport?code=status-report.htm', |
|
headers={"User-Agent": 'Mozilla/5.0'}, |
|
verify=False |
|
) |
|
dfs = pd.read_html(io.StringIO(response.text)) |
|
for x in range(len(dfs)): |
|
dfs[x] = dfs[x].replace({np.nan: None}) |
|
columns_needed = [0, 1, 2, 3, 4] |
|
extracted_dfs = [df.iloc[:, columns_needed] for df in dfs] |
|
columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] |
|
specifications = [] |
|
for df in extracted_dfs: |
|
for index, row in df.iterrows(): |
|
doc = row.to_list() |
|
doc_dict = dict(zip(columns, doc)) |
|
specifications.append(doc_dict) |
|
return specifications |
|
|
|
def process_specification(self, spec): |
|
if self.STOP_EVENT.is_set(): |
|
return |
|
try: |
|
doc_id = str(spec['spec_num']) |
|
version_code = self.version_to_code(str(spec['vers'])) |
|
if not version_code: |
|
with self.DICT_LOCK: |
|
self.processed_count += 1 |
|
return |
|
|
|
document = None |
|
already_indexed = False |
|
with self.DOCUMENT_LOCK: |
|
doc_in_cache = doc_id in self.documents_by_spec_num and \ |
|
self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version_code) |
|
|
|
if doc_in_cache and doc_id not in self.specifications_passed: |
|
document = self.documents_by_spec_num[doc_id] |
|
self.specifications_passed.add(doc_id) |
|
already_indexed = True |
|
elif doc_id not in self.specifications_passed: |
|
doc_content = self.get_spec_content(doc_id, version_code) |
|
if doc_content: |
|
document = {"content": doc_content, "hash": self.hasher(doc_id, version_code)} |
|
with self.DOCUMENT_LOCK: |
|
self.documents_by_spec_num[doc_id] = document |
|
self.specifications_passed.add(doc_id) |
|
already_indexed = False |
|
|
|
if document: |
|
url = f"https://www.3gpp.org/ftp/Specs/archive/{doc_id.split('.')[0]}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" |
|
metadata = { |
|
"id": doc_id, |
|
"title": spec.get("title", ""), |
|
"type": spec.get("type", ""), |
|
"version": str(spec.get("vers", "")), |
|
"working_group": spec.get("WG", ""), |
|
"url": url, |
|
"scope": self.get_scope(document["content"]) |
|
} |
|
key = f"{doc_id}+-+{spec.get('title', '')}+-+{spec.get('type', '')}+-+{spec.get('vers', '')}+-+{spec.get('WG', '')}" |
|
with self.DICT_LOCK: |
|
self.indexed_specifications[key] = metadata |
|
|
|
with self.DICT_LOCK: |
|
self.processed_count += 1 |
|
status = "already indexed" if already_indexed else "indexed now" |
|
print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}") |
|
|
|
except Exception as e: |
|
traceback.print_exc() |
|
print(f"Error processing spec {spec.get('spec_num')} v{spec.get('vers')}: {e}") |
|
with self.DICT_LOCK: |
|
self.processed_count += 1 |
|
print(f"Progress: {self.processed_count}/{self.total_count} specs processed") |
|
|
|
def get_document(self, spec_id: str, spec_title: str): |
|
text = [f"{spec_id} - {spec_title}\n"] |
|
for section in self.spec_contents: |
|
if spec_id == section["doc_id"]: |
|
text.extend([f"{section['section']}\n\n{section['content']}"]) |
|
return text |
|
|
|
def create_bm25_index(self): |
|
dataset_metadata = self.indexed_specifications.values() |
|
unique_specs = set() |
|
corpus_json = [] |
|
|
|
for specification in dataset_metadata: |
|
if specification['id'] in unique_specs: continue |
|
for section in self.spec_contents: |
|
if specification['id'] == section['doc_id']: |
|
corpus_json.append({"text": f"{section['section']}\n{section['content']}", "metadata": { |
|
"id": specification['id'], |
|
"title": specification['title'], |
|
"section_title": section['section'], |
|
"version": specification['version'], |
|
"type": specification['type'], |
|
"working_group": specification['working_group'], |
|
"url": specification['url'], |
|
"scope": specification['scope'] |
|
}}) |
|
|
|
corpus_text = [doc["text"] for doc in corpus_json] |
|
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") |
|
|
|
print("Indexing BM25") |
|
retriever = BM25HF(corpus=corpus_json) |
|
retriever.index(corpus_tokens) |
|
|
|
retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSections", token=os.environ.get("HF")) |
|
|
|
unique_specs = set() |
|
corpus_json = [] |
|
|
|
for specification in dataset_metadata: |
|
if specification['id'] in unique_specs: continue |
|
text_list = self.get_document(specification['id'], specification['title']) |
|
text = "\n".join(text_list) |
|
if len(text_list) == 1: continue |
|
corpus_json.append({"text": text, "metadata": specification}) |
|
unique_specs.add(specification['id']) |
|
|
|
corpus_text = [doc["text"] for doc in corpus_json] |
|
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") |
|
|
|
print("Indexing BM25") |
|
retriever = BM25HF(corpus=corpus_json) |
|
retriever.index(corpus_tokens) |
|
|
|
retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSingle", token=os.environ.get("HF")) |
|
|
|
def run(self): |
|
print("Fetching specification tables from 3GPP...") |
|
yield "event: info\ndata: Indexing 3GPP specs ...\n\n" |
|
specifications = self.fetch_spec_table() |
|
self.total_count = len(specifications) |
|
print(f"Processing {self.total_count} specs with {self.max_workers} threads...") |
|
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
|
futures = [executor.submit(self.process_specification, spec) for spec in specifications] |
|
total = len(futures) |
|
done_count = 0 |
|
yield f"event: get-maximum\ndata: {total}\n\n" |
|
|
|
for future in concurrent.futures.as_completed(futures): |
|
done_count += 1 |
|
yield f"event: progress\ndata: {done_count}\n\n" |
|
if self.STOP_EVENT.is_set(): |
|
break |
|
print("All specs processed.") |
|
|
|
|
|
def save(self): |
|
print("Saving indexed data...") |
|
flat_metadata = [metadata for metadata in self.indexed_specifications.values()] |
|
flat_docs = [] |
|
print("Flatting doc contents") |
|
for doc_id, data in self.documents_by_spec_num.items(): |
|
for title, content in data["content"].items(): |
|
flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) |
|
print("Creating datasets ...") |
|
push_spec_content = Dataset.from_list(flat_docs) |
|
push_spec_metadata = Dataset.from_list(flat_metadata) |
|
|
|
print("Pushing ...") |
|
push_spec_content.push_to_hub("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF"]) |
|
push_spec_metadata.push_to_hub("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF"]) |
|
|
|
self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list() |
|
self.documents_by_spec_num = self._make_doc_index(self.spec_contents) |
|
print("Save finished.") |
|
|
|
class SpecETSIIndexer: |
|
def __init__(self, max_workers=16): |
|
self.session = requests.Session() |
|
self.session.verify = False |
|
|
|
self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list() |
|
self.documents_by_spec_num = self._make_doc_index(self.spec_contents) |
|
self.indexed_specifications = {} |
|
self.specifications_passed = set() |
|
self.processed_count = 0 |
|
self.total_count = 0 |
|
|
|
self.DICT_LOCK = threading.Lock() |
|
self.DOCUMENT_LOCK = threading.Lock() |
|
self.STOP_EVENT = threading.Event() |
|
self.max_workers = max_workers |
|
|
|
self.df = self._fetch_spec_table() |
|
|
|
def _make_doc_index(self, specs): |
|
doc_index = {} |
|
for section in specs: |
|
if section["doc_id"] not in doc_index: |
|
doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} |
|
else: |
|
doc_index[section["doc_id"]]["content"][section["section"]] = section["content"] |
|
return doc_index |
|
|
|
def _fetch_spec_table(self): |
|
|
|
print("Connexion login ETSI...") |
|
self.session.post( |
|
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx", |
|
verify=False, |
|
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ..."}, |
|
data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")}), |
|
) |
|
|
|
print("Récupération des métadonnées TS/TR …") |
|
url_ts = "https://www.etsi.org/?option=com_standardssearch&view=data&format=csv&includeScope=1&page=1&search=&title=1&etsiNumber=1&content=0&version=0&onApproval=0&published=1&withdrawn=0&historical=0&isCurrent=1&superseded=0&harmonized=0&keyword=&TB=&stdType=TS&frequency=&mandate=&collection=&sort=1" |
|
url_tr = url_ts.replace("stdType=TS", "stdType=TR") |
|
data_ts = self.session.get(url_ts, verify=False).content |
|
data_tr = self.session.get(url_tr, verify=False).content |
|
df_ts = pd.read_csv(io.StringIO(data_ts.decode('utf-8')), sep=";", skiprows=1, index_col=False) |
|
df_tr = pd.read_csv(io.StringIO(data_tr.decode('utf-8')), sep=";", skiprows=1, index_col=False) |
|
|
|
backup_ts = df_ts["ETSI deliverable"] |
|
backup_tr = df_tr["ETSI deliverable"] |
|
df_ts["ETSI deliverable"] = df_ts["ETSI deliverable"].str.extract(r"\s*ETSI TS (\d+ \d+(?:-\d+(?:-\d+)?)?)") |
|
df_tr["ETSI deliverable"] = df_tr["ETSI deliverable"].str.extract(r"\s*ETSI TR (\d+ \d+(?:-\d+(?:-\d+)?)?)") |
|
version1 = backup_ts.str.extract(r"\s*ETSI TS \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") |
|
version2 = backup_tr.str.extract(r"\s*ETSI TR \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") |
|
df_ts["Version"] = version1[0] |
|
df_tr["Version"] = version2[0] |
|
|
|
def ver_tuple(v): |
|
return tuple(map(int, v.split("."))) |
|
df_ts["temp"] = df_ts["Version"].apply(ver_tuple) |
|
df_tr["temp"] = df_tr["Version"].apply(ver_tuple) |
|
df_ts["Type"] = "TS" |
|
df_tr["Type"] = "TR" |
|
df = pd.concat([df_ts, df_tr]) |
|
unique_df = df.loc[df.groupby("ETSI deliverable")["temp"].idxmax()] |
|
unique_df = unique_df.drop(columns="temp") |
|
unique_df = unique_df[(~unique_df["title"].str.contains("3GPP", case=True, na=False))] |
|
df = df.drop(columns="temp") |
|
df = df[(~df["title"].str.contains("3GPP", case=True, na=False))] |
|
return df |
|
|
|
@staticmethod |
|
def hasher(specification: str, version: str): |
|
return hashlib.md5(f"{specification}{version}".encode()).hexdigest() |
|
|
|
@staticmethod |
|
def get_scope(content): |
|
for title, text in content.items(): |
|
if title.lower().endswith("scope"): |
|
return text |
|
return "" |
|
|
|
def get_document(self, spec_id: str, spec_title: str): |
|
text = [f"{spec_id} - {spec_title}\n"] |
|
for section in self.spec_contents: |
|
if spec_id == section["doc_id"]: |
|
text.extend([f"{section['section']}\n\n{section['content']}"]) |
|
return text |
|
|
|
def get_text(self, specification: str): |
|
if self.STOP_EVENT.is_set(): |
|
return None, [] |
|
print(f"\n[INFO] Tentative de récupération de la spécification {specification}", flush=True) |
|
try: |
|
|
|
row = self.df[self.df["ETSI deliverable"] == specification] |
|
if row.empty: |
|
print(f"[WARN] Spécification {specification} absente du tableau") |
|
return None, [] |
|
|
|
pdf_link = row.iloc[0]["PDF link"] |
|
response = self.session.get( |
|
pdf_link, |
|
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...'} |
|
) |
|
if response.status_code != 200: |
|
print(f"[ERREUR] Echec du téléchargement du PDF pour {specification}.") |
|
return None, [] |
|
pdf = fitz.open(stream=response.content, filetype="pdf") |
|
return pdf, pdf.get_toc() |
|
except Exception as e: |
|
print(f"[ERROR] Échec get_text pour {specification} : {e}", flush=True) |
|
return None, [] |
|
|
|
def get_spec_content(self, specification: str): |
|
def extract_sections(text, titles): |
|
sections = {} |
|
sorted_titles = sorted(titles, key=lambda t: text.find(t)) |
|
for i, title in enumerate(sorted_titles): |
|
start = text.find(title) |
|
if i + 1 < len(sorted_titles): |
|
end = text.find(sorted_titles[i + 1]) |
|
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip()) |
|
else: |
|
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip()) |
|
return sections |
|
|
|
if self.STOP_EVENT.is_set(): |
|
return {} |
|
print(f"[INFO] Extraction du contenu de {specification}", flush=True) |
|
pdf, doc_toc = self.get_text(specification) |
|
text = [] |
|
if not pdf or not doc_toc: |
|
print("[ERREUR] Pas de texte ou table of contents trouvé !") |
|
return {} |
|
|
|
first_page = 0 |
|
for level, title, page in doc_toc: |
|
first_page = page - 1 |
|
break |
|
for page in pdf[first_page:]: |
|
text.append("\n".join([line.strip() for line in page.get_text().splitlines()])) |
|
text = "\n".join(text) |
|
if not text or not doc_toc or self.STOP_EVENT.is_set(): |
|
print("[ERREUR] Pas de texte/table of contents récupéré !") |
|
return {} |
|
titles = [] |
|
for level, title, page in doc_toc: |
|
if self.STOP_EVENT.is_set(): |
|
return {} |
|
if title and title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text: |
|
titles.append('\n'.join(title.strip().split(" ", 1))) |
|
return extract_sections(text, titles) |
|
|
|
def process_specification(self, spec): |
|
if self.STOP_EVENT.is_set(): |
|
return |
|
try: |
|
version = spec.get('Version') |
|
if not version: return |
|
doc_id = str(spec.get("ETSI deliverable")) |
|
document = None |
|
already_indexed = False |
|
|
|
with self.DOCUMENT_LOCK: |
|
if (doc_id in self.documents_by_spec_num |
|
and self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version) |
|
and doc_id not in self.specifications_passed): |
|
document = self.documents_by_spec_num[doc_id] |
|
self.specifications_passed.add(doc_id) |
|
already_indexed = True |
|
elif doc_id in self.specifications_passed: |
|
document = self.documents_by_spec_num[doc_id] |
|
already_indexed = True |
|
else: |
|
document_content = self.get_spec_content(doc_id) |
|
if document_content: |
|
self.documents_by_spec_num[doc_id] = {"content": document_content, "hash": self.hasher(doc_id, version)} |
|
document = {"content": document_content, "hash": self.hasher(doc_id, version)} |
|
self.specifications_passed.add(doc_id) |
|
already_indexed = False |
|
|
|
if document: |
|
string_key = f"{doc_id}+-+{spec['title']}+-+{spec['Type']}+-+{spec['Version']}" |
|
metadata = { |
|
"id": str(doc_id), |
|
"title": spec["title"], |
|
"type": spec["Type"], |
|
"version": version, |
|
"url": spec["PDF link"], |
|
"scope": "" if not document else self.get_scope(document["content"]) |
|
} |
|
with self.DICT_LOCK: |
|
self.indexed_specifications[string_key] = metadata |
|
|
|
with self.DICT_LOCK: |
|
self.processed_count += 1 |
|
status = "already indexed" if already_indexed else "indexed now" |
|
print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}") |
|
|
|
except Exception as e: |
|
traceback.print_exc() |
|
print(f"\n[ERREUR] Échec du traitement de {doc_id} {spec.get('Version')}: {e}", flush=True) |
|
with self.DICT_LOCK: |
|
self.processed_count += 1 |
|
print(f"Progress: {self.processed_count}/{self.total_count} specs processed") |
|
|
|
def run(self): |
|
print("Démarrage indexation ETSI…") |
|
yield "event: info\ndata: Indexing ETSI specs ...\n\n" |
|
specifications = self.df.to_dict(orient="records") |
|
self.total_count = len(specifications) |
|
print(f"Traitement de {self.total_count} specs avec {self.max_workers} threads...\n") |
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
|
futures = [executor.submit(self.process_specification, spec) for spec in specifications] |
|
total = len(futures) |
|
done_count = 0 |
|
yield f"event: get-maximum\ndata: {total}\n\n" |
|
|
|
for future in concurrent.futures.as_completed(futures): |
|
done_count += 1 |
|
yield f"event: progress\ndata: {done_count}\n\n" |
|
if self.STOP_EVENT.is_set(): |
|
break |
|
|
|
print(f"\nAll {self.processed_count}/{self.total_count} specs processed.") |
|
|
|
def save(self): |
|
print("\nSauvegarde en cours...", flush=True) |
|
flat_metadata = [metadata for metadata in self.indexed_specifications.values()] |
|
flat_docs = [] |
|
for doc_id, data in self.documents_by_spec_num.items(): |
|
for title, content in data["content"].items(): |
|
flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) |
|
push_spec_content = Dataset.from_list(flat_docs) |
|
push_spec_metadata = Dataset.from_list(flat_metadata) |
|
push_spec_content.push_to_hub("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF"]) |
|
push_spec_metadata.push_to_hub("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF"]) |
|
|
|
self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list() |
|
self.documents_by_spec_num = self._make_doc_index(self.spec_contents) |
|
print("Sauvegarde terminée.") |
|
|
|
def create_bm25_index(self): |
|
dataset_metadata = self.indexed_specifications.values() |
|
unique_specs = set() |
|
corpus_json = [] |
|
|
|
for specification in dataset_metadata: |
|
if specification['id'] in unique_specs: continue |
|
for section in self.spec_contents: |
|
if specification['id'] == section['doc_id']: |
|
corpus_json.append({"text": f"{section['section']}\n{section['content']}", "metadata": { |
|
"id": specification['id'], |
|
"title": specification['title'], |
|
"section_title": section['section'], |
|
"version": specification['version'], |
|
"type": specification['type'], |
|
"url": specification['url'], |
|
"scope": specification['scope'] |
|
}}) |
|
|
|
corpus_text = [doc["text"] for doc in corpus_json] |
|
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") |
|
|
|
print("Indexing BM25") |
|
retriever = BM25HF(corpus=corpus_json) |
|
retriever.index(corpus_tokens) |
|
|
|
retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSections", token=os.environ.get("HF")) |
|
|
|
unique_specs = set() |
|
corpus_json = [] |
|
|
|
for specification in dataset_metadata: |
|
if specification['id'] in unique_specs: continue |
|
text_list = self.get_document(specification['id'], specification['title']) |
|
text = "\n".join(text_list) |
|
if len(text_list) == 1: continue |
|
corpus_json.append({"text": text, "metadata": specification}) |
|
unique_specs.add(specification['id']) |
|
|
|
corpus_text = [doc["text"] for doc in corpus_json] |
|
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") |
|
|
|
print("Indexing BM25") |
|
retriever = BM25HF(corpus=corpus_json) |
|
retriever.index(corpus_tokens) |
|
|
|
retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSingle", token=os.environ.get("HF")) |
|
|