Merge pull request #553 from GurjotSinghShorthillsAI/custom-chunking-feature
Browse files- lightrag/lightrag.py +67 -0
lightrag/lightrag.py
CHANGED
@@ -458,6 +458,73 @@ class LightRAG:
|
|
458 |
# Ensure all indexes are updated after each document
|
459 |
await self._insert_done()
|
460 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
461 |
async def _insert_done(self):
|
462 |
tasks = []
|
463 |
for storage_inst in [
|
|
|
458 |
# Ensure all indexes are updated after each document
|
459 |
await self._insert_done()
|
460 |
|
461 |
+
def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
|
462 |
+
loop = always_get_an_event_loop()
|
463 |
+
return loop.run_until_complete(
|
464 |
+
self.ainsert_custom_chunks(full_text, text_chunks)
|
465 |
+
)
|
466 |
+
|
467 |
+
async def ainsert_custom_chunks(self, full_text: str, text_chunks: list[str]):
|
468 |
+
update_storage = False
|
469 |
+
try:
|
470 |
+
doc_key = compute_mdhash_id(full_text.strip(), prefix="doc-")
|
471 |
+
new_docs = {doc_key: {"content": full_text.strip()}}
|
472 |
+
|
473 |
+
_add_doc_keys = await self.full_docs.filter_keys([doc_key])
|
474 |
+
new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys}
|
475 |
+
if not len(new_docs):
|
476 |
+
logger.warning("This document is already in the storage.")
|
477 |
+
return
|
478 |
+
|
479 |
+
update_storage = True
|
480 |
+
logger.info(f"[New Docs] inserting {len(new_docs)} docs")
|
481 |
+
|
482 |
+
inserting_chunks = {}
|
483 |
+
for chunk_text in text_chunks:
|
484 |
+
chunk_text_stripped = chunk_text.strip()
|
485 |
+
chunk_key = compute_mdhash_id(chunk_text_stripped, prefix="chunk-")
|
486 |
+
|
487 |
+
inserting_chunks[chunk_key] = {
|
488 |
+
"content": chunk_text_stripped,
|
489 |
+
"full_doc_id": doc_key,
|
490 |
+
}
|
491 |
+
|
492 |
+
_add_chunk_keys = await self.text_chunks.filter_keys(
|
493 |
+
list(inserting_chunks.keys())
|
494 |
+
)
|
495 |
+
inserting_chunks = {
|
496 |
+
k: v for k, v in inserting_chunks.items() if k in _add_chunk_keys
|
497 |
+
}
|
498 |
+
if not len(inserting_chunks):
|
499 |
+
logger.warning("All chunks are already in the storage.")
|
500 |
+
return
|
501 |
+
|
502 |
+
logger.info(f"[New Chunks] inserting {len(inserting_chunks)} chunks")
|
503 |
+
|
504 |
+
await self.chunks_vdb.upsert(inserting_chunks)
|
505 |
+
|
506 |
+
logger.info("[Entity Extraction]...")
|
507 |
+
maybe_new_kg = await extract_entities(
|
508 |
+
inserting_chunks,
|
509 |
+
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
510 |
+
entity_vdb=self.entities_vdb,
|
511 |
+
relationships_vdb=self.relationships_vdb,
|
512 |
+
global_config=asdict(self),
|
513 |
+
)
|
514 |
+
|
515 |
+
if maybe_new_kg is None:
|
516 |
+
logger.warning("No new entities and relationships found")
|
517 |
+
return
|
518 |
+
else:
|
519 |
+
self.chunk_entity_relation_graph = maybe_new_kg
|
520 |
+
|
521 |
+
await self.full_docs.upsert(new_docs)
|
522 |
+
await self.text_chunks.upsert(inserting_chunks)
|
523 |
+
|
524 |
+
finally:
|
525 |
+
if update_storage:
|
526 |
+
await self._insert_done()
|
527 |
+
|
528 |
async def _insert_done(self):
|
529 |
tasks = []
|
530 |
for storage_inst in [
|