import chromadb #from chromadb.utils import embedding_functions from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, T5ForConditionalGeneration, T5Tokenizer, pipeline#, AutoModelForCausalLM #from sentence_splitter import SentenceSplitter, split_text_into_sentences import PyPDF2 import os #import textwrap import re import sys #import warnings import torch #import nlpaug.augmenter.char as nac pdf_folder = "pdf" device = "cuda" if torch.cuda.is_available() else "cpu" qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2") corrector_model = T5ForConditionalGeneration.from_pretrained("yelpfeast/byt5-base-english-ocr-correction") tokenizer = AutoTokenizer.from_pretrained("yelpfeast/byt5-base-english-ocr-correction") corrector_model.to(device) #tokenizer = T5Tokenizer.from_pretrained("mrm8488/t5-base-finetuned-summarize-news") # Puoi usare anche un modello fine-tunato per l'italiano #summarizer_model = T5ForConditionalGeneration.from_pretrained("mrm8488/t5-base-finetuned-summarize-news") embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") chroma_client = chromadb.PersistentClient(path="/tmp") collection = chroma_client.get_or_create_collection(name="documenti") chroma_client.delete_collection(name="documenti") collection = chroma_client.get_or_create_collection(name="documenti") def extract_text_from_pdf(pdf_path): if os.path.isfile(pdf_path): with open(pdf_path, "rb") as file: reader = PyPDF2.PdfReader(file) text = "\n".join([page.extract_text() for page in reader.pages if page.extract_text()]) return text else: return "" def correct(text): inputs = tokenizer(text, return_tensors="pt", padding=True).to(device) outputs = corrector_model.generate(input_ids=inputs['input_ids'], attention_mask=inputs['attention_mask'], do_sample=False) try: clean_text = tokenizer.batch_decode(output_sequences, skip_special_tokens=True) except: clean_text = text return clean_text def chunk_text(text): #Divide il testo in blocchi più piccoli per una migliore indicizzazione. text = text.replace(" ", " ") text = text.replace(" ", " ") text = text.replace(" .", ".") text = text.replace(" ,", ",") text = text.replace(", ", ",") text = text.replace(",", ", ") text = re.sub(r"\.\s*\r?\n", ".\r\n", text) return text.split(".\r\n") def split_chunk(text, max_length=512): chunks = [] while len(text) > max_length: # Trova l'ultima occorrenza di un segno di punteggiatura o spazio prima di max_length match = re.search(r'[\.\?\!\,\;\:\s](?=[^\.\?\!\,\;\:\s]*$)', text[:max_length]) if match: split_index = match.start() + 1 # Include il separatore else: split_index = max_length # Se non trova niente, tronca al limite massimo chunks.append(text[:split_index].strip()) # Rimuove spazi inutili text = text[split_index:].lstrip() # Scarta il separatore e spazi iniziali if text: chunks.append(text.strip()) # Aggiunge l'ultima parte return chunks def add_document_to_chromadb(doc_id, text): #Aggiunge un documento indicizzato a ChromaDB con chunking chunks = chunk_text(text) for i, chunk in enumerate(chunks): chunk = re.sub(r"\s*\r?\n\s*", " ", chunk) # Filtra chunk vuoti o con solo simboli if not re.search(r"[a-zA-Z0-9]", chunk): continue # Salta i chunk senza testo significativo if len(chunk.split()) <= 2: continue # Salta i chunk senza frasi significative print(i, "di", len(chunks)) #print(chunk) #print("\r\n") if len(chunk)>=512: minichunks = split_chunk(chunk) for minichunk in minichunks: minichunk = correct(minichunk) chunk = " ".join(minichunks) else: chunk = correct(chunk) #print(chunk) #print("\r\n") embedding = embed_model.encode(chunk).tolist() collection.add(ids=[f"{doc_id}_{i}"], embeddings=[embedding], metadatas=[{"text": chunk}]) def anonymize_text(text): # Rileva i nomi propri con spaCy nlp = spacy.load("it_core_news_sm") doc = nlp(text) anonymized_text = text for ent in doc.ents: if ent.label_ == "PER": # Se è un nome proprio anonymized_text = anonymized_text.replace(ent.text, "Nome Fittizio ") # Riconosce pattern con iniziali, es. "J. Rossi" anonymized_text = re.sub(r'\b[A-Z]\.\s[A-Z][a-z]+\b', 'Nome Fittizio ', anonymized_text) return anonymized_text def create_db(): from huggingface_hub import HfApi for file_name in os.listdir(pdf_folder): pdf_path = os.path.join(pdf_folder, file_name) text = extract_text_from_pdf(pdf_path) #text = anonymize_text(text) add_document_to_chromadb(file_name, text) HF_TOKEN = os.getenv("HF_TOKEN") api = HfApi(token=HF_TOKEN) api.upload_file( path_or_fileobj="/tmp/chroma.sqlite3", path_in_repo="./chroma.sqlite3", repo_id="lefreakcestchic/QA4Leo", repo_type="space" ) create_db() sys.exit()