YanSte commited on
Commit
40b657d
·
1 Parent(s): 3f9866f

simplified process

Browse files
Files changed (1) hide show
  1. lightrag/lightrag.py +36 -142
lightrag/lightrag.py CHANGED
@@ -563,29 +563,29 @@ class LightRAG:
563
  pending_doc_ids[i : i + batch_size] for i in range(0, len(pending_doc_ids), batch_size)
564
  ]
565
  batch_len = len(batch_docs_list) + 1
 
566
  # 3. iterate over batches
567
  tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
568
  for batch_idx, doc_ids in enumerate(batch_docs_list):
569
-
570
- doc_status: dict[str, Any] = {
571
- "status": DocStatus.PROCESSING,
572
- "updated_at": datetime.now().isoformat(),
573
- }
574
 
 
575
  for doc_id in tqdm_async(
576
  doc_ids,
577
  desc=f"Level 1 - Batch {batch_idx} / {batch_len}",
578
  ):
579
- doc = await self.doc_status.get_by_id(doc_id)
580
- doc_status.update(
 
581
  {
582
- "content_summary": doc["content_summary"],
583
- "content_length": doc["content_length"],
584
- "created_at": doc["created_at"],
 
 
 
 
585
  }
586
  )
587
- await self.doc_status.upsert({doc_id: doc_status})
588
-
589
  # Generate chunks from document
590
  chunks: dict[str, Any] = {
591
  compute_mdhash_id(dp["content"], prefix="chunk-"): {
@@ -593,7 +593,7 @@ class LightRAG:
593
  "full_doc_id": doc_id,
594
  }
595
  for dp in self.chunking_func(
596
- doc["content"],
597
  split_by_character,
598
  split_by_character_only,
599
  self.chunk_overlap_token_size,
@@ -601,57 +601,47 @@ class LightRAG:
601
  self.tiktoken_model_name,
602
  )
603
  }
604
- try:
605
- # If fails it's failed on full doc and text chunks upset
606
- if doc["status"] != DocStatus.FAILED:
607
- # Ensure chunk insertion and graph processing happen sequentially
608
- await self._process_entity_relation_graph(chunks)
609
- await self.chunks_vdb.upsert(chunks)
610
- except Exception as e:
611
- doc_status.update(
612
- {
613
- "status": DocStatus.PENDING,
614
- "error": str(e),
615
- "updated_at": datetime.now().isoformat(),
616
- }
617
- )
618
- await self.doc_status.upsert({doc_id: doc_status})
619
 
 
620
  if doc_id not in full_docs_processed_doc_ids:
621
  tasks[doc_id].append(
622
- self.full_docs.upsert({doc_id: {"content": doc["content"]}})
623
  )
624
-
 
625
  if doc_id not in text_chunks_processed_doc_ids:
626
  tasks[doc_id].append(self.text_chunks.upsert(chunks))
627
 
628
  for doc_id, task in tasks.items():
629
  try:
630
  await asyncio.gather(*task)
631
-
632
- # Update document status
633
- doc_status.update(
634
  {
635
- "status": DocStatus.PROCESSED,
636
- "chunks_count": len(chunks),
637
- "updated_at": datetime.now().isoformat(),
 
 
638
  }
639
  )
640
- await self.doc_status.upsert({doc_id: doc_status})
641
  await self._insert_done()
642
 
643
  except Exception as e:
644
- # Update status with failed information
645
- doc_status.update(
646
- {
647
- "status": DocStatus.FAILED,
648
- "error": str(e),
649
- "updated_at": datetime.now().isoformat(),
650
- }
651
- )
652
- await self.doc_status.upsert({doc_id: doc_status})
653
  logger.error(
654
  f"Failed to process document {doc_id}: {str(e)}\n{traceback.format_exc()}"
 
 
 
 
 
 
 
 
 
655
  )
656
  continue
657
 
@@ -674,102 +664,6 @@ class LightRAG:
674
  logger.error("Failed to extract entities and relationships")
675
  raise e
676
 
677
- # async def apipeline_process_extract_graph(self):
678
- # """
679
- # Process pending or failed chunks to extract entities and relationships.
680
-
681
- # This method retrieves all chunks that are currently marked as pending or have previously failed.
682
- # It then extracts entities and relationships from each chunk and updates the status accordingly.
683
-
684
- # Steps:
685
- # 1. Retrieve all pending and failed chunks.
686
- # 2. For each chunk, attempt to extract entities and relationships.
687
- # 3. Update the chunk's status to processed if successful, or failed if an error occurs.
688
-
689
- # Raises:
690
- # Exception: If there is an error during the extraction process.
691
-
692
- # Returns:
693
- # None
694
- # """
695
- # # 1. get all pending and failed chunks
696
- # to_process_doc_keys: list[str] = []
697
-
698
- # # Process failes
699
- # to_process_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED)
700
- # if to_process_docs:
701
- # to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
702
-
703
- # # Process Pending
704
- # to_process_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING)
705
- # if to_process_docs:
706
- # to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
707
-
708
- # if not to_process_doc_keys:
709
- # logger.info("All documents have been processed or are duplicates")
710
- # return
711
-
712
- # # Process documents in batches
713
- # batch_size = self.addon_params.get("insert_batch_size", 10)
714
-
715
- # semaphore = asyncio.Semaphore(
716
- # batch_size
717
- # ) # Control the number of tasks that are processed simultaneously
718
-
719
- # async def process_chunk(chunk_id: str):
720
- # async with semaphore:
721
- # chunks: dict[str, Any] = {
722
- # i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id])
723
- # }
724
- # async def _process_chunk(chunk_id: str):
725
- # chunks: dict[str, Any] = {
726
- # i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id])
727
- # }
728
-
729
- # # Extract and store entities and relationships
730
- # try:
731
- # maybe_new_kg = await extract_entities(
732
- # chunks,
733
- # knowledge_graph_inst=self.chunk_entity_relation_graph,
734
- # entity_vdb=self.entities_vdb,
735
- # relationships_vdb=self.relationships_vdb,
736
- # llm_response_cache=self.llm_response_cache,
737
- # global_config=asdict(self),
738
- # )
739
- # if maybe_new_kg is None:
740
- # logger.warning("No entities or relationships extracted!")
741
- # # Update status to processed
742
- # await self.text_chunks.upsert(chunks)
743
- # await self.doc_status.upsert({chunk_id: {"status": DocStatus.PROCESSED}})
744
- # except Exception as e:
745
- # logger.error("Failed to extract entities and relationships")
746
- # # Mark as failed if any step fails
747
- # await self.doc_status.upsert({chunk_id: {"status": DocStatus.FAILED}})
748
- # raise e
749
-
750
- # with tqdm_async(
751
- # total=len(to_process_doc_keys),
752
- # desc="\nLevel 1 - Processing chunks",
753
- # unit="chunk",
754
- # position=0,
755
- # ) as progress:
756
- # tasks: list[asyncio.Task[None]] = []
757
- # for chunk_id in to_process_doc_keys:
758
- # task = asyncio.create_task(process_chunk(chunk_id))
759
- # tasks.append(task)
760
-
761
- # for future in asyncio.as_completed(tasks):
762
- # await future
763
- # progress.update(1)
764
- # progress.set_postfix(
765
- # {
766
- # "LLM call": statistic_data["llm_call"],
767
- # "LLM cache": statistic_data["llm_cache"],
768
- # }
769
- # )
770
-
771
- # # Ensure all indexes are updated after each document
772
-
773
  async def _insert_done(self):
774
  tasks = []
775
  for storage_inst in [
 
563
  pending_doc_ids[i : i + batch_size] for i in range(0, len(pending_doc_ids), batch_size)
564
  ]
565
  batch_len = len(batch_docs_list) + 1
566
+
567
  # 3. iterate over batches
568
  tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
569
  for batch_idx, doc_ids in enumerate(batch_docs_list):
 
 
 
 
 
570
 
571
+ # 4. iterate over batch
572
  for doc_id in tqdm_async(
573
  doc_ids,
574
  desc=f"Level 1 - Batch {batch_idx} / {batch_len}",
575
  ):
576
+ # Update status in processing
577
+ status_doc = await self.doc_status.get_by_id(doc_id)
578
+ await self.doc_status.upsert(
579
  {
580
+ doc_id: {
581
+ "status": DocStatus.PROCESSING,
582
+ "updated_at": datetime.now().isoformat(),
583
+ "content_summary": status_doc["content_summary"],
584
+ "content_length": status_doc["content_length"],
585
+ "created_at": status_doc["created_at"],
586
+ }
587
  }
588
  )
 
 
589
  # Generate chunks from document
590
  chunks: dict[str, Any] = {
591
  compute_mdhash_id(dp["content"], prefix="chunk-"): {
 
593
  "full_doc_id": doc_id,
594
  }
595
  for dp in self.chunking_func(
596
+ status_doc["content"],
597
  split_by_character,
598
  split_by_character_only,
599
  self.chunk_overlap_token_size,
 
601
  self.tiktoken_model_name,
602
  )
603
  }
604
+
605
+ # Ensure chunk insertion and graph processing happen sequentially, not in parallel
606
+ await self._process_entity_relation_graph(chunks)
607
+ await self.chunks_vdb.upsert(chunks)
 
 
 
 
 
 
 
 
 
 
 
608
 
609
+ # Check if document already processed the doc
610
  if doc_id not in full_docs_processed_doc_ids:
611
  tasks[doc_id].append(
612
+ self.full_docs.upsert({doc_id: {"content": status_doc["content"]}})
613
  )
614
+
615
+ # check if chunks already processed the doc
616
  if doc_id not in text_chunks_processed_doc_ids:
617
  tasks[doc_id].append(self.text_chunks.upsert(chunks))
618
 
619
  for doc_id, task in tasks.items():
620
  try:
621
  await asyncio.gather(*task)
622
+ await self.doc_status.upsert(
 
 
623
  {
624
+ doc_id: {
625
+ "status": DocStatus.PROCESSED,
626
+ "chunks_count": len(chunks),
627
+ "updated_at": datetime.now().isoformat(),
628
+ }
629
  }
630
  )
 
631
  await self._insert_done()
632
 
633
  except Exception as e:
 
 
 
 
 
 
 
 
 
634
  logger.error(
635
  f"Failed to process document {doc_id}: {str(e)}\n{traceback.format_exc()}"
636
+ )
637
+ await self.doc_status.upsert(
638
+ {
639
+ doc_id: {
640
+ "status": DocStatus.FAILED,
641
+ "error": str(e),
642
+ "updated_at": datetime.now().isoformat(),
643
+ }
644
+ }
645
  )
646
  continue
647
 
 
664
  logger.error("Failed to extract entities and relationships")
665
  raise e
666
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
667
  async def _insert_done(self):
668
  tasks = []
669
  for storage_inst in [