yangdx commited on
Commit
fe1b11b
·
2 Parent(s): 2400576 d2789ce

Merge branch 'Fix-pipeline-batch' into feat-node-expand

Browse files
Files changed (1) hide show
  1. lightrag/lightrag.py +136 -127
lightrag/lightrag.py CHANGED
@@ -769,7 +769,7 @@ class LightRAG:
769
  async with pipeline_status_lock:
770
  # Ensure only one worker is processing documents
771
  if not pipeline_status.get("busy", False):
772
- # 先检查是否有需要处理的文档
773
  processing_docs, failed_docs, pending_docs = await asyncio.gather(
774
  self.doc_status.get_docs_by_status(DocStatus.PROCESSING),
775
  self.doc_status.get_docs_by_status(DocStatus.FAILED),
@@ -781,12 +781,10 @@ class LightRAG:
781
  to_process_docs.update(failed_docs)
782
  to_process_docs.update(pending_docs)
783
 
784
- # 如果没有需要处理的文档,直接返回,保留 pipeline_status 中的内容不变
785
  if not to_process_docs:
786
  logger.info("No documents to process")
787
  return
788
 
789
- # 有文档需要处理,更新 pipeline_status
790
  pipeline_status.update(
791
  {
792
  "busy": True,
@@ -825,7 +823,7 @@ class LightRAG:
825
  for i in range(0, len(to_process_docs), self.max_parallel_insert)
826
  ]
827
 
828
- log_message = f"Number of batches to process: {len(docs_batches)}."
829
  logger.info(log_message)
830
 
831
  # Update pipeline status with current batch information
@@ -834,140 +832,151 @@ class LightRAG:
834
  pipeline_status["latest_message"] = log_message
835
  pipeline_status["history_messages"].append(log_message)
836
 
837
- batches: list[Any] = []
838
- # 3. iterate over batches
839
- for batch_idx, docs_batch in enumerate(docs_batches):
840
- # Update current batch in pipeline status (directly, as it's atomic)
841
- pipeline_status["cur_batch"] += 1
842
-
843
- async def batch(
844
- batch_idx: int,
845
- docs_batch: list[tuple[str, DocProcessingStatus]],
846
- size_batch: int,
847
- ) -> None:
848
- log_message = (
849
- f"Start processing batch {batch_idx + 1} of {size_batch}."
850
- )
851
- logger.info(log_message)
852
- pipeline_status["latest_message"] = log_message
853
- pipeline_status["history_messages"].append(log_message)
854
- # 4. iterate over batch
855
- for doc_id_processing_status in docs_batch:
856
- doc_id, status_doc = doc_id_processing_status
857
- # Generate chunks from document
858
- chunks: dict[str, Any] = {
859
- compute_mdhash_id(dp["content"], prefix="chunk-"): {
860
- **dp,
861
- "full_doc_id": doc_id,
862
- }
863
- for dp in self.chunking_func(
864
- status_doc.content,
865
- split_by_character,
866
- split_by_character_only,
867
- self.chunk_overlap_token_size,
868
- self.chunk_token_size,
869
- self.tiktoken_model_name,
870
- )
871
  }
872
- # Process document (text chunks and full docs) in parallel
873
- # Create tasks with references for potential cancellation
874
- doc_status_task = asyncio.create_task(
875
- self.doc_status.upsert(
876
- {
877
- doc_id: {
878
- "status": DocStatus.PROCESSING,
879
- "updated_at": datetime.now().isoformat(),
880
- "content": status_doc.content,
881
- "content_summary": status_doc.content_summary,
882
- "content_length": status_doc.content_length,
883
- "created_at": status_doc.created_at,
884
- }
885
- }
886
- )
887
- )
888
- chunks_vdb_task = asyncio.create_task(
889
- self.chunks_vdb.upsert(chunks)
890
  )
891
- entity_relation_task = asyncio.create_task(
892
- self._process_entity_relation_graph(
893
- chunks, pipeline_status, pipeline_status_lock
894
- )
 
 
 
 
 
 
 
 
 
 
 
895
  )
896
- full_docs_task = asyncio.create_task(
897
- self.full_docs.upsert(
898
- {doc_id: {"content": status_doc.content}}
899
- )
 
 
 
900
  )
901
- text_chunks_task = asyncio.create_task(
902
- self.text_chunks.upsert(chunks)
 
 
903
  )
904
- tasks = [
905
- doc_status_task,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
906
  chunks_vdb_task,
907
  entity_relation_task,
908
  full_docs_task,
909
  text_chunks_task,
910
- ]
911
- try:
912
- await asyncio.gather(*tasks)
913
- await self.doc_status.upsert(
914
- {
915
- doc_id: {
916
- "status": DocStatus.PROCESSED,
917
- "chunks_count": len(chunks),
918
- "content": status_doc.content,
919
- "content_summary": status_doc.content_summary,
920
- "content_length": status_doc.content_length,
921
- "created_at": status_doc.created_at,
922
- "updated_at": datetime.now().isoformat(),
923
- }
924
- }
925
- )
926
- except Exception as e:
927
- # Log error and update pipeline status
928
- error_msg = (
929
- f"Failed to process document {doc_id}: {str(e)}"
930
- )
931
- logger.error(error_msg)
932
- pipeline_status["latest_message"] = error_msg
933
- pipeline_status["history_messages"].append(error_msg)
934
-
935
- # Cancel other tasks as they are no longer meaningful
936
- for task in [
937
- chunks_vdb_task,
938
- entity_relation_task,
939
- full_docs_task,
940
- text_chunks_task,
941
- ]:
942
- if not task.done():
943
- task.cancel()
944
-
945
- # Update document status to failed
946
- await self.doc_status.upsert(
947
- {
948
- doc_id: {
949
- "status": DocStatus.FAILED,
950
- "error": str(e),
951
- "content": status_doc.content,
952
- "content_summary": status_doc.content_summary,
953
- "content_length": status_doc.content_length,
954
- "created_at": status_doc.created_at,
955
- "updated_at": datetime.now().isoformat(),
956
- }
957
- }
958
- )
959
- continue
960
- log_message = (
961
- f"Completed batch {batch_idx + 1} of {len(docs_batches)}."
962
  )
963
- logger.info(log_message)
964
- pipeline_status["latest_message"] = log_message
965
- pipeline_status["history_messages"].append(log_message)
966
 
967
- batches.append(batch(batch_idx, docs_batch, len(docs_batches)))
 
 
968
 
969
- await asyncio.gather(*batches)
970
- await self._insert_done()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
971
 
972
  # Check if there's a pending request to process more documents (with lock)
973
  has_pending_request = False
@@ -1042,7 +1051,7 @@ class LightRAG:
1042
  ]
1043
  await asyncio.gather(*tasks)
1044
 
1045
- log_message = "All Insert done"
1046
  logger.info(log_message)
1047
 
1048
  if pipeline_status is not None and pipeline_status_lock is not None:
 
769
  async with pipeline_status_lock:
770
  # Ensure only one worker is processing documents
771
  if not pipeline_status.get("busy", False):
772
+
773
  processing_docs, failed_docs, pending_docs = await asyncio.gather(
774
  self.doc_status.get_docs_by_status(DocStatus.PROCESSING),
775
  self.doc_status.get_docs_by_status(DocStatus.FAILED),
 
781
  to_process_docs.update(failed_docs)
782
  to_process_docs.update(pending_docs)
783
 
 
784
  if not to_process_docs:
785
  logger.info("No documents to process")
786
  return
787
 
 
788
  pipeline_status.update(
789
  {
790
  "busy": True,
 
823
  for i in range(0, len(to_process_docs), self.max_parallel_insert)
824
  ]
825
 
826
+ log_message = f"Processing {len(to_process_docs)} document(s) in {len(docs_batches)} batches"
827
  logger.info(log_message)
828
 
829
  # Update pipeline status with current batch information
 
832
  pipeline_status["latest_message"] = log_message
833
  pipeline_status["history_messages"].append(log_message)
834
 
835
+ async def process_document(
836
+ doc_id: str,
837
+ status_doc: DocProcessingStatus,
838
+ split_by_character: str | None,
839
+ split_by_character_only: bool,
840
+ pipeline_status: dict,
841
+ pipeline_status_lock: asyncio.Lock
842
+ ) -> None:
843
+ """Process single document"""
844
+ try:
845
+ # Generate chunks from document
846
+ chunks: dict[str, Any] = {
847
+ compute_mdhash_id(dp["content"], prefix="chunk-"): {
848
+ **dp,
849
+ "full_doc_id": doc_id,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
850
  }
851
+ for dp in self.chunking_func(
852
+ status_doc.content,
853
+ split_by_character,
854
+ split_by_character_only,
855
+ self.chunk_overlap_token_size,
856
+ self.chunk_token_size,
857
+ self.tiktoken_model_name,
 
 
 
 
 
 
 
 
 
 
 
858
  )
859
+ }
860
+ # Process document (text chunks and full docs) in parallel
861
+ # Create tasks with references for potential cancellation
862
+ doc_status_task = asyncio.create_task(
863
+ self.doc_status.upsert(
864
+ {
865
+ doc_id: {
866
+ "status": DocStatus.PROCESSING,
867
+ "updated_at": datetime.now().isoformat(),
868
+ "content": status_doc.content,
869
+ "content_summary": status_doc.content_summary,
870
+ "content_length": status_doc.content_length,
871
+ "created_at": status_doc.created_at,
872
+ }
873
+ }
874
  )
875
+ )
876
+ chunks_vdb_task = asyncio.create_task(
877
+ self.chunks_vdb.upsert(chunks)
878
+ )
879
+ entity_relation_task = asyncio.create_task(
880
+ self._process_entity_relation_graph(
881
+ chunks, pipeline_status, pipeline_status_lock
882
  )
883
+ )
884
+ full_docs_task = asyncio.create_task(
885
+ self.full_docs.upsert(
886
+ {doc_id: {"content": status_doc.content}}
887
  )
888
+ )
889
+ text_chunks_task = asyncio.create_task(
890
+ self.text_chunks.upsert(chunks)
891
+ )
892
+ tasks = [
893
+ doc_status_task,
894
+ chunks_vdb_task,
895
+ entity_relation_task,
896
+ full_docs_task,
897
+ text_chunks_task,
898
+ ]
899
+ await asyncio.gather(*tasks)
900
+ await self.doc_status.upsert(
901
+ {
902
+ doc_id: {
903
+ "status": DocStatus.PROCESSED,
904
+ "chunks_count": len(chunks),
905
+ "content": status_doc.content,
906
+ "content_summary": status_doc.content_summary,
907
+ "content_length": status_doc.content_length,
908
+ "created_at": status_doc.created_at,
909
+ "updated_at": datetime.now().isoformat(),
910
+ }
911
+ }
912
+ )
913
+ except Exception as e:
914
+ # Log error and update pipeline status
915
+ error_msg = (
916
+ f"Failed to process document {doc_id}: {str(e)}"
917
+ )
918
+ logger.error(error_msg)
919
+ async with pipeline_status_lock:
920
+ pipeline_status["latest_message"] = error_msg
921
+ pipeline_status["history_messages"].append(error_msg)
922
+
923
+ # Cancel other tasks as they are no longer meaningful
924
+ for task in [
925
  chunks_vdb_task,
926
  entity_relation_task,
927
  full_docs_task,
928
  text_chunks_task,
929
+ ]:
930
+ if not task.done():
931
+ task.cancel()
932
+ # Update document status to failed
933
+ await self.doc_status.upsert(
934
+ {
935
+ doc_id: {
936
+ "status": DocStatus.FAILED,
937
+ "error": str(e),
938
+ "content": status_doc.content,
939
+ "content_summary": status_doc.content_summary,
940
+ "content_length": status_doc.content_length,
941
+ "created_at": status_doc.created_at,
942
+ "updated_at": datetime.now().isoformat(),
943
+ }
944
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
945
  )
 
 
 
946
 
947
+ # 3. iterate over batches
948
+ total_batches = len(docs_batches)
949
+ for batch_idx, docs_batch in enumerate(docs_batches):
950
 
951
+ current_batch = batch_idx + 1
952
+ log_message = f"Start processing batch {current_batch} of {total_batches}."
953
+ logger.info(log_message)
954
+ pipeline_status["cur_batch"] = current_batch
955
+ pipeline_status["latest_message"] = log_message
956
+ pipeline_status["history_messages"].append(log_message)
957
+
958
+ doc_tasks = []
959
+ for doc_id, status_doc in docs_batch:
960
+ doc_tasks.append(
961
+ process_document(
962
+ doc_id,
963
+ status_doc,
964
+ split_by_character,
965
+ split_by_character_only,
966
+ pipeline_status,
967
+ pipeline_status_lock
968
+ )
969
+ )
970
+
971
+ # Process documents in one batch parallelly
972
+ await asyncio.gather(*doc_tasks)
973
+ await self._insert_done()
974
+
975
+ log_message = f"Completed batch {current_batch} of {total_batches}."
976
+ logger.info(log_message)
977
+ pipeline_status["latest_message"] = log_message
978
+ pipeline_status["history_messages"].append(log_message)
979
+
980
 
981
  # Check if there's a pending request to process more documents (with lock)
982
  has_pending_request = False
 
1051
  ]
1052
  await asyncio.gather(*tasks)
1053
 
1054
+ log_message = "All data persist to disk"
1055
  logger.info(log_message)
1056
 
1057
  if pipeline_status is not None and pipeline_status_lock is not None: