gzdaniel commited on
Commit
f87247c
·
1 Parent(s): 0e8a0f8

Improve task execution with early failure detection

Browse files

- Add early failure detection for async tasks
- Cancel pending tasks on first exception

Files changed (2) hide show
  1. lightrag/lightrag.py +1 -1
  2. lightrag/operate.py +38 -4
lightrag/lightrag.py CHANGED
@@ -1127,7 +1127,7 @@ class LightRAG:
1127
  }
1128
  )
1129
 
1130
- # Concurrency is controlled by graph db lock for individual entities and relationships
1131
  if file_extraction_stage_ok:
1132
  try:
1133
  # Get chunk_results from entity_relation_task
 
1127
  }
1128
  )
1129
 
1130
+ # Concurrency is controlled by keyed lock for individual entities and relationships
1131
  if file_extraction_stage_ok:
1132
  try:
1133
  # Get chunk_results from entity_relation_task
lightrag/operate.py CHANGED
@@ -480,8 +480,25 @@ async def _rebuild_knowledge_from_chunks(
480
  pipeline_status["latest_message"] = status_message
481
  pipeline_status["history_messages"].append(status_message)
482
 
483
- # Execute all tasks in parallel with semaphore control
484
- await asyncio.gather(*tasks)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
485
 
486
  # Final status report
487
  status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships rebuilt successfully."
@@ -1313,8 +1330,25 @@ async def merge_nodes_and_edges(
1313
  for edge_key, edges in all_edges.items():
1314
  tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges)))
1315
 
1316
- # Execute all tasks in parallel with semaphore control
1317
- await asyncio.gather(*tasks)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1318
 
1319
 
1320
  async def extract_entities(
 
480
  pipeline_status["latest_message"] = status_message
481
  pipeline_status["history_messages"].append(status_message)
482
 
483
+ # Execute all tasks in parallel with semaphore control and early failure detection
484
+ done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
485
+
486
+ # Check if any task raised an exception
487
+ for task in done:
488
+ if task.exception():
489
+ # If a task failed, cancel all pending tasks
490
+ for pending_task in pending:
491
+ pending_task.cancel()
492
+
493
+ # Wait for cancellation to complete
494
+ if pending:
495
+ await asyncio.wait(pending)
496
+
497
+ # Re-raise the exception to notify the caller
498
+ raise task.exception()
499
+
500
+ # If all tasks completed successfully, collect results
501
+ # (No need to collect results since these tasks don't return values)
502
 
503
  # Final status report
504
  status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships rebuilt successfully."
 
1330
  for edge_key, edges in all_edges.items():
1331
  tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges)))
1332
 
1333
+ # Execute all tasks in parallel with semaphore control and early failure detection
1334
+ done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
1335
+
1336
+ # Check if any task raised an exception
1337
+ for task in done:
1338
+ if task.exception():
1339
+ # If a task failed, cancel all pending tasks
1340
+ for pending_task in pending:
1341
+ pending_task.cancel()
1342
+
1343
+ # Wait for cancellation to complete
1344
+ if pending:
1345
+ await asyncio.wait(pending)
1346
+
1347
+ # Re-raise the exception to notify the caller
1348
+ raise task.exception()
1349
+
1350
+ # If all tasks completed successfully, collect results
1351
+ # (No need to collect results since these tasks don't return values)
1352
 
1353
 
1354
  async def extract_entities(