yangdx commited on
Commit
e9cfd68
·
1 Parent(s): 86f2cf7

feat: add history_messages to track pipeline processing progress

Browse files

• Add shared history_messages list
• Track pipeline progress with messages

lightrag/api/routers/document_routes.py CHANGED
@@ -672,6 +672,10 @@ def create_document_routes(
672
  # Convert to regular dict if it's a Manager.dict
673
  status_dict = dict(pipeline_status)
674
 
 
 
 
 
675
  # Format the job_start time if it exists
676
  if status_dict.get("job_start"):
677
  status_dict["job_start"] = str(status_dict["job_start"])
 
672
  # Convert to regular dict if it's a Manager.dict
673
  status_dict = dict(pipeline_status)
674
 
675
+ # Convert history_messages to a regular list if it's a Manager.list
676
+ if "history_messages" in status_dict:
677
+ status_dict["history_messages"] = list(status_dict["history_messages"])
678
+
679
  # Format the job_start time if it exists
680
  if status_dict.get("job_start"):
681
  status_dict["job_start"] = str(status_dict["job_start"])
lightrag/kg/shared_storage.py CHANGED
@@ -83,6 +83,10 @@ def initialize_share_data(workers: int = 1):
83
 
84
  # Initialize pipeline status for document indexing control
85
  pipeline_namespace = get_namespace_data("pipeline_status")
 
 
 
 
86
  pipeline_namespace.update({
87
  "busy": False, # Control concurrent processes
88
  "job_name": "Default Job", # Current job name (indexing files/indexing texts)
@@ -91,7 +95,8 @@ def initialize_share_data(workers: int = 1):
91
  "batchs": 0, # Number of batches for processing documents
92
  "cur_batch": 0, # Current processing batch
93
  "request_pending": False, # Flag for pending request for processing
94
- "latest_message": "" # Latest message from pipeline processing
 
95
  })
96
 
97
 
 
83
 
84
  # Initialize pipeline status for document indexing control
85
  pipeline_namespace = get_namespace_data("pipeline_status")
86
+
87
+ # 创建一个共享列表对象用于 history_messages
88
+ history_messages = _manager.list() if is_multiprocess else []
89
+
90
  pipeline_namespace.update({
91
  "busy": False, # Control concurrent processes
92
  "job_name": "Default Job", # Current job name (indexing files/indexing texts)
 
95
  "batchs": 0, # Number of batches for processing documents
96
  "cur_batch": 0, # Current processing batch
97
  "request_pending": False, # Flag for pending request for processing
98
+ "latest_message": "", # Latest message from pipeline processing
99
+ "history_messages": history_messages, # 使用共享列表对象
100
  })
101
 
102
 
lightrag/lightrag.py CHANGED
@@ -681,6 +681,13 @@ class LightRAG:
681
  with storage_lock:
682
  if not pipeline_status.get("busy", False):
683
  # No other process is busy, we can process documents
 
 
 
 
 
 
 
684
  pipeline_status.update({
685
  "busy": True,
686
  "job_name": "indexing files",
@@ -688,7 +695,10 @@ class LightRAG:
688
  "docs": 0,
689
  "batchs": 0,
690
  "cur_batch": 0,
691
- "request_pending": False # Clear any previous request
 
 
 
692
  })
693
  process_documents = True
694
  else:
@@ -715,7 +725,10 @@ class LightRAG:
715
  to_process_docs.update(pending_docs)
716
 
717
  if not to_process_docs:
718
- logger.info("All documents have been processed or are duplicates")
 
 
 
719
  break
720
 
721
  # Update pipeline status with document count (with lock)
@@ -734,7 +747,10 @@ class LightRAG:
734
  "cur_batch": 0
735
  })
736
 
737
- logger.info(f"Number of batches to process: {len(docs_batches)}.")
 
 
 
738
 
739
  batches: list[Any] = []
740
  # 3. iterate over batches
@@ -747,7 +763,10 @@ class LightRAG:
747
  docs_batch: list[tuple[str, DocProcessingStatus]],
748
  size_batch: int,
749
  ) -> None:
750
- logger.info(f"Start processing batch {batch_idx + 1} of {size_batch}.")
 
 
 
751
  # 4. iterate over batch
752
  for doc_id_processing_status in docs_batch:
753
  doc_id, status_doc = doc_id_processing_status
@@ -818,7 +837,10 @@ class LightRAG:
818
  }
819
  )
820
  continue
821
- logger.info(f"Completed batch {batch_idx + 1} of {len(docs_batches)}.")
 
 
 
822
 
823
  batches.append(batch(batch_idx, docs_batch, len(docs_batches)))
824
 
@@ -836,13 +858,19 @@ class LightRAG:
836
  if not has_pending_request:
837
  break
838
 
839
- logger.info("Processing additional documents due to pending request")
 
 
 
840
 
841
  finally:
842
  # Always reset busy status when done or if an exception occurs (with lock)
843
  with storage_lock:
844
  pipeline_status["busy"] = False
845
- logger.info("Document processing pipeline completed")
 
 
 
846
 
847
  async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None:
848
  try:
@@ -873,7 +901,15 @@ class LightRAG:
873
  if storage_inst is not None
874
  ]
875
  await asyncio.gather(*tasks)
876
- logger.info("All Insert done")
 
 
 
 
 
 
 
 
877
 
878
  def insert_custom_kg(self, custom_kg: dict[str, Any]) -> None:
879
  loop = always_get_an_event_loop()
 
681
  with storage_lock:
682
  if not pipeline_status.get("busy", False):
683
  # No other process is busy, we can process documents
684
+ # 获取当前的 history_messages 列表
685
+ current_history = pipeline_status.get("history_messages", [])
686
+
687
+ # 清空当前列表内容但保持同一个列表对象
688
+ if hasattr(current_history, "clear"):
689
+ current_history.clear()
690
+
691
  pipeline_status.update({
692
  "busy": True,
693
  "job_name": "indexing files",
 
695
  "docs": 0,
696
  "batchs": 0,
697
  "cur_batch": 0,
698
+ "request_pending": False, # Clear any previous request
699
+ "latest_message": "",
700
+ # 保持使用同一个列表对象
701
+ "history_messages": current_history,
702
  })
703
  process_documents = True
704
  else:
 
725
  to_process_docs.update(pending_docs)
726
 
727
  if not to_process_docs:
728
+ log_message = "All documents have been processed or are duplicates"
729
+ logger.info(log_message)
730
+ pipeline_status["latest_message"] = log_message
731
+ pipeline_status["history_messages"].append(log_message)
732
  break
733
 
734
  # Update pipeline status with document count (with lock)
 
747
  "cur_batch": 0
748
  })
749
 
750
+ log_message = f"Number of batches to process: {len(docs_batches)}."
751
+ logger.info(log_message)
752
+ pipeline_status["latest_message"] = log_message
753
+ pipeline_status["history_messages"].append(log_message)
754
 
755
  batches: list[Any] = []
756
  # 3. iterate over batches
 
763
  docs_batch: list[tuple[str, DocProcessingStatus]],
764
  size_batch: int,
765
  ) -> None:
766
+ log_message = f"Start processing batch {batch_idx + 1} of {size_batch}."
767
+ logger.info(log_message)
768
+ pipeline_status["latest_message"] = log_message
769
+ pipeline_status["history_messages"].append(log_message)
770
  # 4. iterate over batch
771
  for doc_id_processing_status in docs_batch:
772
  doc_id, status_doc = doc_id_processing_status
 
837
  }
838
  )
839
  continue
840
+ log_message = f"Completed batch {batch_idx + 1} of {len(docs_batches)}."
841
+ logger.info(log_message)
842
+ pipeline_status["latest_message"] = log_message
843
+ pipeline_status["history_messages"].append(log_message)
844
 
845
  batches.append(batch(batch_idx, docs_batch, len(docs_batches)))
846
 
 
858
  if not has_pending_request:
859
  break
860
 
861
+ log_message = "Processing additional documents due to pending request"
862
+ logger.info(log_message)
863
+ pipeline_status["latest_message"] = log_message
864
+ pipeline_status["history_messages"].append(log_message)
865
 
866
  finally:
867
  # Always reset busy status when done or if an exception occurs (with lock)
868
  with storage_lock:
869
  pipeline_status["busy"] = False
870
+ log_message = "Document processing pipeline completed"
871
+ logger.info(log_message)
872
+ pipeline_status["latest_message"] = log_message
873
+ pipeline_status["history_messages"].append(log_message)
874
 
875
  async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None:
876
  try:
 
901
  if storage_inst is not None
902
  ]
903
  await asyncio.gather(*tasks)
904
+
905
+ log_message = "All Insert done"
906
+ logger.info(log_message)
907
+
908
+ # 获取 pipeline_status 并更新 latest_message 和 history_messages
909
+ from lightrag.kg.shared_storage import get_namespace_data
910
+ pipeline_status = get_namespace_data("pipeline_status")
911
+ pipeline_status["latest_message"] = log_message
912
+ pipeline_status["history_messages"].append(log_message)
913
 
914
  def insert_custom_kg(self, custom_kg: dict[str, Any]) -> None:
915
  loop = always_get_an_event_loop()
lightrag/operate.py CHANGED
@@ -336,6 +336,9 @@ async def extract_entities(
336
  global_config: dict[str, str],
337
  llm_response_cache: BaseKVStorage | None = None,
338
  ) -> None:
 
 
 
339
  use_llm_func: callable = global_config["llm_model_func"]
340
  entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
341
  enable_llm_cache_for_entity_extract: bool = global_config[
@@ -496,9 +499,10 @@ async def extract_entities(
496
  processed_chunks += 1
497
  entities_count = len(maybe_nodes)
498
  relations_count = len(maybe_edges)
499
- logger.info(
500
- f" Chunk {processed_chunks}/{total_chunks}: extracted {entities_count} entities and {relations_count} relationships (deduplicated)"
501
- )
 
502
  return dict(maybe_nodes), dict(maybe_edges)
503
 
504
  tasks = [_process_single_content(c) for c in ordered_chunks]
@@ -527,17 +531,27 @@ async def extract_entities(
527
  )
528
 
529
  if not (all_entities_data or all_relationships_data):
530
- logger.info("Didn't extract any entities and relationships.")
 
 
 
531
  return
532
 
533
  if not all_entities_data:
534
- logger.info("Didn't extract any entities")
 
 
 
535
  if not all_relationships_data:
536
- logger.info("Didn't extract any relationships")
537
-
538
- logger.info(
539
- f"Extracted {len(all_entities_data)} entities and {len(all_relationships_data)} relationships (deduplicated)"
540
- )
 
 
 
 
541
  verbose_debug(
542
  f"New entities:{all_entities_data}, relationships:{all_relationships_data}"
543
  )
 
336
  global_config: dict[str, str],
337
  llm_response_cache: BaseKVStorage | None = None,
338
  ) -> None:
339
+ # 在函数开始处添加获取 pipeline_status 的代码
340
+ from lightrag.kg.shared_storage import get_namespace_data
341
+ pipeline_status = get_namespace_data("pipeline_status")
342
  use_llm_func: callable = global_config["llm_model_func"]
343
  entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
344
  enable_llm_cache_for_entity_extract: bool = global_config[
 
499
  processed_chunks += 1
500
  entities_count = len(maybe_nodes)
501
  relations_count = len(maybe_edges)
502
+ log_message = f" Chunk {processed_chunks}/{total_chunks}: extracted {entities_count} entities and {relations_count} relationships (deduplicated)"
503
+ logger.info(log_message)
504
+ pipeline_status["latest_message"] = log_message
505
+ pipeline_status["history_messages"].append(log_message)
506
  return dict(maybe_nodes), dict(maybe_edges)
507
 
508
  tasks = [_process_single_content(c) for c in ordered_chunks]
 
531
  )
532
 
533
  if not (all_entities_data or all_relationships_data):
534
+ log_message = "Didn't extract any entities and relationships."
535
+ logger.info(log_message)
536
+ pipeline_status["latest_message"] = log_message
537
+ pipeline_status["history_messages"].append(log_message)
538
  return
539
 
540
  if not all_entities_data:
541
+ log_message = "Didn't extract any entities"
542
+ logger.info(log_message)
543
+ pipeline_status["latest_message"] = log_message
544
+ pipeline_status["history_messages"].append(log_message)
545
  if not all_relationships_data:
546
+ log_message = "Didn't extract any relationships"
547
+ logger.info(log_message)
548
+ pipeline_status["latest_message"] = log_message
549
+ pipeline_status["history_messages"].append(log_message)
550
+
551
+ log_message = f"Extracted {len(all_entities_data)} entities and {len(all_relationships_data)} relationships (deduplicated)"
552
+ logger.info(log_message)
553
+ pipeline_status["latest_message"] = log_message
554
+ pipeline_status["history_messages"].append(log_message)
555
  verbose_debug(
556
  f"New entities:{all_entities_data}, relationships:{all_relationships_data}"
557
  )