gzdaniel commited on
Commit
6bcca36
·
1 Parent(s): 604c72c

Optimize knowledge graph rebuild with parallel processing

Browse files

- Add parallel processing for KG rebuild
- Implement keyed locks for data consistency

Files changed (1) hide show
  1. lightrag/operate.py +124 -64
lightrag/operate.py CHANGED
@@ -275,20 +275,26 @@ async def _rebuild_knowledge_from_chunks(
275
  pipeline_status: dict | None = None,
276
  pipeline_status_lock=None,
277
  ) -> None:
278
- """Rebuild entity and relationship descriptions from cached extraction results
279
 
280
  This method uses cached LLM extraction results instead of calling LLM again,
281
- following the same approach as the insert process.
 
282
 
283
  Args:
284
  entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids
285
  relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids
286
- text_chunks_data: Pre-loaded chunk data dict {chunk_id: chunk_data}
 
 
 
 
 
 
 
287
  """
288
  if not entities_to_rebuild and not relationships_to_rebuild:
289
  return
290
- rebuilt_entities_count = 0
291
- rebuilt_relationships_count = 0
292
 
293
  # Get all referenced chunk IDs
294
  all_referenced_chunk_ids = set()
@@ -297,7 +303,7 @@ async def _rebuild_knowledge_from_chunks(
297
  for chunk_ids in relationships_to_rebuild.values():
298
  all_referenced_chunk_ids.update(chunk_ids)
299
 
300
- status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions"
301
  logger.info(status_message)
302
  if pipeline_status is not None and pipeline_status_lock is not None:
303
  async with pipeline_status_lock:
@@ -367,66 +373,116 @@ async def _rebuild_knowledge_from_chunks(
367
  pipeline_status["history_messages"].append(status_message)
368
  continue
369
 
370
- # Rebuild entities
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
371
  for entity_name, chunk_ids in entities_to_rebuild.items():
372
- try:
373
- await _rebuild_single_entity(
374
- knowledge_graph_inst=knowledge_graph_inst,
375
- entities_vdb=entities_vdb,
376
- entity_name=entity_name,
377
- chunk_ids=chunk_ids,
378
- chunk_entities=chunk_entities,
379
- llm_response_cache=llm_response_cache,
380
- global_config=global_config,
381
- )
382
- rebuilt_entities_count += 1
383
- status_message = (
384
- f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
385
- )
386
- logger.info(status_message)
387
- if pipeline_status is not None and pipeline_status_lock is not None:
388
- async with pipeline_status_lock:
389
- pipeline_status["latest_message"] = status_message
390
- pipeline_status["history_messages"].append(status_message)
391
- except Exception as e:
392
- status_message = f"Failed to rebuild entity {entity_name}: {e}"
393
- logger.info(status_message) # Per requirement, change to info
394
- if pipeline_status is not None and pipeline_status_lock is not None:
395
- async with pipeline_status_lock:
396
- pipeline_status["latest_message"] = status_message
397
- pipeline_status["history_messages"].append(status_message)
398
 
399
- # Rebuild relationships
400
  for (src, tgt), chunk_ids in relationships_to_rebuild.items():
401
- try:
402
- await _rebuild_single_relationship(
403
- knowledge_graph_inst=knowledge_graph_inst,
404
- relationships_vdb=relationships_vdb,
405
- src=src,
406
- tgt=tgt,
407
- chunk_ids=chunk_ids,
408
- chunk_relationships=chunk_relationships,
409
- llm_response_cache=llm_response_cache,
410
- global_config=global_config,
411
- )
412
- rebuilt_relationships_count += 1
413
- status_message = (
414
- f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
415
- )
416
- logger.info(status_message)
417
- if pipeline_status is not None and pipeline_status_lock is not None:
418
- async with pipeline_status_lock:
419
- pipeline_status["latest_message"] = status_message
420
- pipeline_status["history_messages"].append(status_message)
421
- except Exception as e:
422
- status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
423
- logger.info(status_message)
424
- if pipeline_status is not None and pipeline_status_lock is not None:
425
- async with pipeline_status_lock:
426
- pipeline_status["latest_message"] = status_message
427
- pipeline_status["history_messages"].append(status_message)
428
 
429
- status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships."
430
  logger.info(status_message)
431
  if pipeline_status is not None and pipeline_status_lock is not None:
432
  async with pipeline_status_lock:
@@ -726,7 +782,11 @@ async def _rebuild_single_relationship(
726
  llm_response_cache: BaseKVStorage,
727
  global_config: dict[str, str],
728
  ) -> None:
729
- """Rebuild a single relationship from cached extraction results"""
 
 
 
 
730
 
731
  # Get current relationship data
732
  current_relationship = await knowledge_graph_inst.get_edge(src, tgt)
@@ -1148,7 +1208,7 @@ async def merge_nodes_and_edges(
1148
  pipeline_status["history_messages"].append(log_message)
1149
 
1150
  # Get max async tasks limit from global_config for semaphore control
1151
- llm_model_max_async = global_config.get("llm_model_max_async", 4)
1152
  semaphore = asyncio.Semaphore(llm_model_max_async)
1153
 
1154
  async def _locked_process_entity_name(entity_name, entities):
 
275
  pipeline_status: dict | None = None,
276
  pipeline_status_lock=None,
277
  ) -> None:
278
+ """Rebuild entity and relationship descriptions from cached extraction results with parallel processing
279
 
280
  This method uses cached LLM extraction results instead of calling LLM again,
281
+ following the same approach as the insert process. Now with parallel processing
282
+ controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency.
283
 
284
  Args:
285
  entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids
286
  relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids
287
+ knowledge_graph_inst: Knowledge graph storage
288
+ entities_vdb: Entity vector database
289
+ relationships_vdb: Relationship vector database
290
+ text_chunks_storage: Text chunks storage
291
+ llm_response_cache: LLM response cache
292
+ global_config: Global configuration containing llm_model_max_async
293
+ pipeline_status: Pipeline status dictionary
294
+ pipeline_status_lock: Lock for pipeline status
295
  """
296
  if not entities_to_rebuild and not relationships_to_rebuild:
297
  return
 
 
298
 
299
  # Get all referenced chunk IDs
300
  all_referenced_chunk_ids = set()
 
303
  for chunk_ids in relationships_to_rebuild.values():
304
  all_referenced_chunk_ids.update(chunk_ids)
305
 
306
+ status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions (parallel processing)"
307
  logger.info(status_message)
308
  if pipeline_status is not None and pipeline_status_lock is not None:
309
  async with pipeline_status_lock:
 
373
  pipeline_status["history_messages"].append(status_message)
374
  continue
375
 
376
+ # Get max async tasks limit from global_config for semaphore control
377
+ llm_model_max_async = global_config.get("llm_model_max_async", 4) + 1
378
+ semaphore = asyncio.Semaphore(llm_model_max_async)
379
+
380
+ # Counters for tracking progress
381
+ rebuilt_entities_count = 0
382
+ rebuilt_relationships_count = 0
383
+ failed_entities_count = 0
384
+ failed_relationships_count = 0
385
+
386
+ async def _locked_rebuild_entity(entity_name, chunk_ids):
387
+ nonlocal rebuilt_entities_count, failed_entities_count
388
+ async with semaphore:
389
+ workspace = global_config.get("workspace", "")
390
+ namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
391
+ async with get_storage_keyed_lock(
392
+ [entity_name], namespace=namespace, enable_logging=False
393
+ ):
394
+ try:
395
+ await _rebuild_single_entity(
396
+ knowledge_graph_inst=knowledge_graph_inst,
397
+ entities_vdb=entities_vdb,
398
+ entity_name=entity_name,
399
+ chunk_ids=chunk_ids,
400
+ chunk_entities=chunk_entities,
401
+ llm_response_cache=llm_response_cache,
402
+ global_config=global_config,
403
+ )
404
+ rebuilt_entities_count += 1
405
+ status_message = (
406
+ f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
407
+ )
408
+ logger.info(status_message)
409
+ if pipeline_status is not None and pipeline_status_lock is not None:
410
+ async with pipeline_status_lock:
411
+ pipeline_status["latest_message"] = status_message
412
+ pipeline_status["history_messages"].append(status_message)
413
+ except Exception as e:
414
+ failed_entities_count += 1
415
+ status_message = f"Failed to rebuild entity {entity_name}: {e}"
416
+ logger.info(status_message) # Per requirement, change to info
417
+ if pipeline_status is not None and pipeline_status_lock is not None:
418
+ async with pipeline_status_lock:
419
+ pipeline_status["latest_message"] = status_message
420
+ pipeline_status["history_messages"].append(status_message)
421
+
422
+ async def _locked_rebuild_relationship(src, tgt, chunk_ids):
423
+ nonlocal rebuilt_relationships_count, failed_relationships_count
424
+ async with semaphore:
425
+ workspace = global_config.get("workspace", "")
426
+ namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
427
+ async with get_storage_keyed_lock(
428
+ f"{src}-{tgt}", namespace=namespace, enable_logging=False
429
+ ):
430
+ try:
431
+ await _rebuild_single_relationship(
432
+ knowledge_graph_inst=knowledge_graph_inst,
433
+ relationships_vdb=relationships_vdb,
434
+ src=src,
435
+ tgt=tgt,
436
+ chunk_ids=chunk_ids,
437
+ chunk_relationships=chunk_relationships,
438
+ llm_response_cache=llm_response_cache,
439
+ global_config=global_config,
440
+ )
441
+ rebuilt_relationships_count += 1
442
+ status_message = f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
443
+ logger.info(status_message)
444
+ if pipeline_status is not None and pipeline_status_lock is not None:
445
+ async with pipeline_status_lock:
446
+ pipeline_status["latest_message"] = status_message
447
+ pipeline_status["history_messages"].append(status_message)
448
+ except Exception as e:
449
+ failed_relationships_count += 1
450
+ status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
451
+ logger.info(status_message) # Per requirement, change to info
452
+ if pipeline_status is not None and pipeline_status_lock is not None:
453
+ async with pipeline_status_lock:
454
+ pipeline_status["latest_message"] = status_message
455
+ pipeline_status["history_messages"].append(status_message)
456
+
457
+ # Create tasks for parallel processing
458
+ tasks = []
459
+
460
+ # Add entity rebuilding tasks
461
  for entity_name, chunk_ids in entities_to_rebuild.items():
462
+ task = asyncio.create_task(_locked_rebuild_entity(entity_name, chunk_ids))
463
+ tasks.append(task)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
464
 
465
+ # Add relationship rebuilding tasks
466
  for (src, tgt), chunk_ids in relationships_to_rebuild.items():
467
+ task = asyncio.create_task(_locked_rebuild_relationship(src, tgt, chunk_ids))
468
+ tasks.append(task)
469
+
470
+ # Log parallel processing start
471
+ status_message = f"Starting parallel rebuild of {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships (max concurrent: {llm_model_max_async})"
472
+ logger.info(status_message)
473
+ if pipeline_status is not None and pipeline_status_lock is not None:
474
+ async with pipeline_status_lock:
475
+ pipeline_status["latest_message"] = status_message
476
+ pipeline_status["history_messages"].append(status_message)
477
+
478
+ # Execute all tasks in parallel with semaphore control
479
+ await asyncio.gather(*tasks)
480
+
481
+ # Final status report
482
+ status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships rebuilt successfully."
483
+ if failed_entities_count > 0 or failed_relationships_count > 0:
484
+ status_message += f" Failed: {failed_entities_count} entities, {failed_relationships_count} relationships."
 
 
 
 
 
 
 
 
 
485
 
 
486
  logger.info(status_message)
487
  if pipeline_status is not None and pipeline_status_lock is not None:
488
  async with pipeline_status_lock:
 
782
  llm_response_cache: BaseKVStorage,
783
  global_config: dict[str, str],
784
  ) -> None:
785
+ """Rebuild a single relationship from cached extraction results
786
+
787
+ Note: This function assumes the caller has already acquired the appropriate
788
+ keyed lock for the relationship pair to ensure thread safety.
789
+ """
790
 
791
  # Get current relationship data
792
  current_relationship = await knowledge_graph_inst.get_edge(src, tgt)
 
1208
  pipeline_status["history_messages"].append(log_message)
1209
 
1210
  # Get max async tasks limit from global_config for semaphore control
1211
+ llm_model_max_async = global_config.get("llm_model_max_async", 4) + 1
1212
  semaphore = asyncio.Semaphore(llm_model_max_async)
1213
 
1214
  async def _locked_process_entity_name(entity_name, entities):