yangdx commited on
Commit
c83e870
·
1 Parent(s): 353b669

Set max parallel chunks processing according to MAX_SYNC of LLM

Browse files
Files changed (1) hide show
  1. lightrag/operate.py +13 -5
lightrag/operate.py CHANGED
@@ -674,11 +674,17 @@ async def extract_entities(
674
  # Return the extracted nodes and edges for centralized processing
675
  return maybe_nodes, maybe_edges
676
 
677
- # Handle all chunks in parallel and collect results
678
- # Create tasks for all chunks
 
 
 
 
 
 
679
  tasks = []
680
  for c in ordered_chunks:
681
- task = asyncio.create_task(_process_single_content(c))
682
  tasks.append(task)
683
 
684
  # Wait for tasks to complete or for the first exception to occur
@@ -755,7 +761,7 @@ async def extract_entities(
755
  total_entities_count = len(entities_data)
756
  total_relations_count = len(relationships_data)
757
 
758
- log_message = f"Updating vector storage: {total_entities_count} entities"
759
  logger.info(log_message)
760
  if pipeline_status is not None:
761
  async with pipeline_status_lock:
@@ -776,7 +782,9 @@ async def extract_entities(
776
  }
777
  await entity_vdb.upsert(data_for_vdb)
778
 
779
- log_message = f"Updating vector storage: {total_relations_count} relationships"
 
 
780
  logger.info(log_message)
781
  if pipeline_status is not None:
782
  async with pipeline_status_lock:
 
674
  # Return the extracted nodes and edges for centralized processing
675
  return maybe_nodes, maybe_edges
676
 
677
+ # Get max async tasks limit from global_config
678
+ llm_model_max_async = global_config.get("llm_model_max_async", 4)
679
+ semaphore = asyncio.Semaphore(llm_model_max_async)
680
+
681
+ async def _process_with_semaphore(chunk):
682
+ async with semaphore:
683
+ return await _process_single_content(chunk)
684
+
685
  tasks = []
686
  for c in ordered_chunks:
687
+ task = asyncio.create_task(_process_with_semaphore(c))
688
  tasks.append(task)
689
 
690
  # Wait for tasks to complete or for the first exception to occur
 
761
  total_entities_count = len(entities_data)
762
  total_relations_count = len(relationships_data)
763
 
764
+ log_message = f"Updating vector storage: {total_entities_count} entities..."
765
  logger.info(log_message)
766
  if pipeline_status is not None:
767
  async with pipeline_status_lock:
 
782
  }
783
  await entity_vdb.upsert(data_for_vdb)
784
 
785
+ log_message = (
786
+ f"Updating vector storage: {total_relations_count} relationships..."
787
+ )
788
  logger.info(log_message)
789
  if pipeline_status is not None:
790
  async with pipeline_status_lock: