yangdx commited on
Commit
8467a67
·
1 Parent(s): 82df198

fix: cancel pending tasks when any chunk processing fails

Browse files

Modify extract_entities function to terminate all pending text chunk processing tasks when any single chunk processing fails.

Files changed (1) hide show
  1. lightrag/operate.py +27 -2
lightrag/operate.py CHANGED
@@ -666,8 +666,33 @@ async def extract_entities(
666
  return maybe_nodes, maybe_edges
667
 
668
  # Handle all chunks in parallel and collect results
669
- tasks = [_process_single_content(c) for c in ordered_chunks]
670
- chunk_results = await asyncio.gather(*tasks)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
671
 
672
  # Collect all nodes and edges from all chunks
673
  all_nodes = defaultdict(list)
 
666
  return maybe_nodes, maybe_edges
667
 
668
  # Handle all chunks in parallel and collect results
669
+ # Create tasks for all chunks
670
+ tasks = []
671
+ for c in ordered_chunks:
672
+ task = asyncio.create_task(_process_single_content(c))
673
+ tasks.append(task)
674
+
675
+ # Wait for tasks to complete or for the first exception to occur
676
+ # This allows us to cancel remaining tasks if any task fails
677
+ done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
678
+
679
+ # Check if any task raised an exception
680
+ for task in done:
681
+ if task.exception():
682
+ # If a task failed, cancel all pending tasks
683
+ # This prevents unnecessary processing since the parent function will abort anyway
684
+ for pending_task in pending:
685
+ pending_task.cancel()
686
+
687
+ # Wait for cancellation to complete
688
+ if pending:
689
+ await asyncio.wait(pending)
690
+
691
+ # Re-raise the exception to notify the caller
692
+ raise task.exception()
693
+
694
+ # If all tasks completed successfully, collect results
695
+ chunk_results = [task.result() for task in tasks]
696
 
697
  # Collect all nodes and edges from all chunks
698
  all_nodes = defaultdict(list)