yangdx commited on
Commit
8d92e7b
·
1 Parent(s): 23ed632

Refactor: Entity and edge merging in extract_entities

Browse files

- Improves efficiency by merging identical
entities and edges in a single operation
- Esures proper handling of undirected graph edges
- Change merge stage from chunk leve to document level

Files changed (1) hide show
  1. lightrag/operate.py +123 -101
lightrag/operate.py CHANGED
@@ -139,7 +139,7 @@ async def _handle_entity_relation_summary(
139
  logger.debug(f"Trigger summary: {entity_or_relation_name}")
140
 
141
  # Update pipeline status when LLM summary is needed
142
- status_message = "Use LLM to re-summary description..."
143
  logger.info(status_message)
144
  if pipeline_status is not None and pipeline_status_lock is not None:
145
  async with pipeline_status_lock:
@@ -244,14 +244,6 @@ async def _merge_nodes_then_upsert(
244
 
245
  already_node = await knowledge_graph_inst.get_node(entity_name)
246
  if already_node is not None:
247
- # Update pipeline status when a node that needs merging is found
248
- status_message = f"Merging entity: {entity_name}"
249
- logger.info(status_message)
250
- if pipeline_status is not None and pipeline_status_lock is not None:
251
- async with pipeline_status_lock:
252
- pipeline_status["latest_message"] = status_message
253
- pipeline_status["history_messages"].append(status_message)
254
-
255
  already_entity_types.append(already_node["entity_type"])
256
  already_source_ids.extend(
257
  split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP])
@@ -278,15 +270,24 @@ async def _merge_nodes_then_upsert(
278
  set([dp["file_path"] for dp in nodes_data] + already_file_paths)
279
  )
280
 
281
- logger.debug(f"file_path: {file_path}")
282
- description = await _handle_entity_relation_summary(
283
- entity_name,
284
- description,
285
- global_config,
286
- pipeline_status,
287
- pipeline_status_lock,
288
- llm_response_cache,
289
- )
 
 
 
 
 
 
 
 
 
290
  node_data = dict(
291
  entity_id=entity_name,
292
  entity_type=entity_type,
@@ -319,14 +320,6 @@ async def _merge_edges_then_upsert(
319
  already_file_paths = []
320
 
321
  if await knowledge_graph_inst.has_edge(src_id, tgt_id):
322
- # Update pipeline status when an edge that needs merging is found
323
- status_message = f"Merging edge::: {src_id} - {tgt_id}"
324
- logger.info(status_message)
325
- if pipeline_status is not None and pipeline_status_lock is not None:
326
- async with pipeline_status_lock:
327
- pipeline_status["latest_message"] = status_message
328
- pipeline_status["history_messages"].append(status_message)
329
-
330
  already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id)
331
  # Handle the case where get_edge returns None or missing fields
332
  if already_edge:
@@ -404,14 +397,25 @@ async def _merge_edges_then_upsert(
404
  "file_path": file_path,
405
  },
406
  )
407
- description = await _handle_entity_relation_summary(
408
- f"({src_id}, {tgt_id})",
409
- description,
410
- global_config,
411
- pipeline_status,
412
- pipeline_status_lock,
413
- llm_response_cache,
414
- )
 
 
 
 
 
 
 
 
 
 
 
415
  await knowledge_graph_inst.upsert_edge(
416
  src_id,
417
  tgt_id,
@@ -550,8 +554,10 @@ async def extract_entities(
550
  Args:
551
  chunk_key_dp (tuple[str, TextChunkSchema]):
552
  ("chunk-xxxxxx", {"tokens": int, "content": str, "full_doc_id": str, "chunk_order_index": int})
 
 
553
  """
554
- nonlocal processed_chunks, total_entities_count, total_relations_count
555
  chunk_key = chunk_key_dp[0]
556
  chunk_dp = chunk_key_dp[1]
557
  content = chunk_dp["content"]
@@ -623,75 +629,91 @@ async def extract_entities(
623
  pipeline_status["latest_message"] = log_message
624
  pipeline_status["history_messages"].append(log_message)
625
 
626
- # Use graph database lock to ensure atomic merges and updates
627
- chunk_entities_data = []
628
- chunk_relationships_data = []
629
-
630
- async with graph_db_lock:
631
- # Process and update entities
632
- for entity_name, entities in maybe_nodes.items():
633
- entity_data = await _merge_nodes_then_upsert(
634
- entity_name,
635
- entities,
636
- knowledge_graph_inst,
637
- global_config,
638
- pipeline_status,
639
- pipeline_status_lock,
640
- llm_response_cache,
641
- )
642
- chunk_entities_data.append(entity_data)
643
-
644
- # Process and update relationships
645
- for edge_key, edges in maybe_edges.items():
646
- # Ensure edge direction consistency
647
- sorted_edge_key = tuple(sorted(edge_key))
648
- edge_data = await _merge_edges_then_upsert(
649
- sorted_edge_key[0],
650
- sorted_edge_key[1],
651
- edges,
652
- knowledge_graph_inst,
653
- global_config,
654
- pipeline_status,
655
- pipeline_status_lock,
656
- llm_response_cache,
657
- )
658
- chunk_relationships_data.append(edge_data)
659
-
660
- # Update vector database (within the same lock to ensure atomicity)
661
- if entity_vdb is not None and chunk_entities_data:
662
- data_for_vdb = {
663
- compute_mdhash_id(dp["entity_name"], prefix="ent-"): {
664
- "entity_name": dp["entity_name"],
665
- "entity_type": dp["entity_type"],
666
- "content": f"{dp['entity_name']}\n{dp['description']}",
667
- "source_id": dp["source_id"],
668
- "file_path": dp.get("file_path", "unknown_source"),
669
- }
670
- for dp in chunk_entities_data
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
671
  }
672
- await entity_vdb.upsert(data_for_vdb)
673
-
674
- if relationships_vdb is not None and chunk_relationships_data:
675
- data_for_vdb = {
676
- compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): {
677
- "src_id": dp["src_id"],
678
- "tgt_id": dp["tgt_id"],
679
- "keywords": dp["keywords"],
680
- "content": f"{dp['src_id']}\t{dp['tgt_id']}\n{dp['keywords']}\n{dp['description']}",
681
- "source_id": dp["source_id"],
682
- "file_path": dp.get("file_path", "unknown_source"),
683
- }
684
- for dp in chunk_relationships_data
685
  }
686
- await relationships_vdb.upsert(data_for_vdb)
 
 
687
 
688
- # Update counters
689
- total_entities_count += len(chunk_entities_data)
690
- total_relations_count += len(chunk_relationships_data)
691
-
692
- # Handle all chunks in parallel
693
- tasks = [_process_single_content(c) for c in ordered_chunks]
694
- await asyncio.gather(*tasks)
695
 
696
  log_message = f"Extracted {total_entities_count} entities + {total_relations_count} relationships (total)"
697
  logger.info(log_message)
 
139
  logger.debug(f"Trigger summary: {entity_or_relation_name}")
140
 
141
  # Update pipeline status when LLM summary is needed
142
+ status_message = " == Use LLM == to re-summary description..."
143
  logger.info(status_message)
144
  if pipeline_status is not None and pipeline_status_lock is not None:
145
  async with pipeline_status_lock:
 
244
 
245
  already_node = await knowledge_graph_inst.get_node(entity_name)
246
  if already_node is not None:
 
 
 
 
 
 
 
 
247
  already_entity_types.append(already_node["entity_type"])
248
  already_source_ids.extend(
249
  split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP])
 
270
  set([dp["file_path"] for dp in nodes_data] + already_file_paths)
271
  )
272
 
273
+ if len(nodes_data) > 1 or len(already_entity_types) > 0:
274
+ # Update pipeline status when a node that needs merging
275
+ status_message = f"Merging entity: {entity_name} | {len(nodes_data)}+{len(already_entity_types)}"
276
+ logger.info(status_message)
277
+ if pipeline_status is not None and pipeline_status_lock is not None:
278
+ async with pipeline_status_lock:
279
+ pipeline_status["latest_message"] = status_message
280
+ pipeline_status["history_messages"].append(status_message)
281
+
282
+ description = await _handle_entity_relation_summary(
283
+ entity_name,
284
+ description,
285
+ global_config,
286
+ pipeline_status,
287
+ pipeline_status_lock,
288
+ llm_response_cache,
289
+ )
290
+
291
  node_data = dict(
292
  entity_id=entity_name,
293
  entity_type=entity_type,
 
320
  already_file_paths = []
321
 
322
  if await knowledge_graph_inst.has_edge(src_id, tgt_id):
 
 
 
 
 
 
 
 
323
  already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id)
324
  # Handle the case where get_edge returns None or missing fields
325
  if already_edge:
 
397
  "file_path": file_path,
398
  },
399
  )
400
+
401
+ if len(edges_data) > 1 or len(already_weights) > 0:
402
+ # Update pipeline status when a edge that needs merging
403
+ status_message = f"Merging edge::: {src_id} - {tgt_id} | {len(edges_data)}+{len(already_weights)}"
404
+ logger.info(status_message)
405
+ if pipeline_status is not None and pipeline_status_lock is not None:
406
+ async with pipeline_status_lock:
407
+ pipeline_status["latest_message"] = status_message
408
+ pipeline_status["history_messages"].append(status_message)
409
+
410
+ description = await _handle_entity_relation_summary(
411
+ f"({src_id}, {tgt_id})",
412
+ description,
413
+ global_config,
414
+ pipeline_status,
415
+ pipeline_status_lock,
416
+ llm_response_cache,
417
+ )
418
+
419
  await knowledge_graph_inst.upsert_edge(
420
  src_id,
421
  tgt_id,
 
554
  Args:
555
  chunk_key_dp (tuple[str, TextChunkSchema]):
556
  ("chunk-xxxxxx", {"tokens": int, "content": str, "full_doc_id": str, "chunk_order_index": int})
557
+ Returns:
558
+ tuple: (maybe_nodes, maybe_edges) containing extracted entities and relationships
559
  """
560
+ nonlocal processed_chunks
561
  chunk_key = chunk_key_dp[0]
562
  chunk_dp = chunk_key_dp[1]
563
  content = chunk_dp["content"]
 
629
  pipeline_status["latest_message"] = log_message
630
  pipeline_status["history_messages"].append(log_message)
631
 
632
+ # Return the extracted nodes and edges for centralized processing
633
+ return maybe_nodes, maybe_edges
634
+
635
+ # Handle all chunks in parallel and collect results
636
+ tasks = [_process_single_content(c) for c in ordered_chunks]
637
+ chunk_results = await asyncio.gather(*tasks)
638
+
639
+ # Collect all nodes and edges from all chunks
640
+ all_nodes = defaultdict(list)
641
+ all_edges = defaultdict(list)
642
+
643
+ for maybe_nodes, maybe_edges in chunk_results:
644
+ # Collect nodes
645
+ for entity_name, entities in maybe_nodes.items():
646
+ all_nodes[entity_name].extend(entities)
647
+
648
+ # Collect edges with sorted keys for undirected graph
649
+ for edge_key, edges in maybe_edges.items():
650
+ sorted_edge_key = tuple(sorted(edge_key))
651
+ all_edges[sorted_edge_key].extend(edges)
652
+
653
+ # Centralized processing of all nodes and edges
654
+ entities_data = []
655
+ relationships_data = []
656
+
657
+ # Use graph database lock to ensure atomic merges and updates
658
+ async with graph_db_lock:
659
+ # Process and update all entities at once
660
+ for entity_name, entities in all_nodes.items():
661
+ entity_data = await _merge_nodes_then_upsert(
662
+ entity_name,
663
+ entities,
664
+ knowledge_graph_inst,
665
+ global_config,
666
+ pipeline_status,
667
+ pipeline_status_lock,
668
+ llm_response_cache,
669
+ )
670
+ entities_data.append(entity_data)
671
+
672
+ # Process and update all relationships at once
673
+ for edge_key, edges in all_edges.items():
674
+ edge_data = await _merge_edges_then_upsert(
675
+ edge_key[0],
676
+ edge_key[1],
677
+ edges,
678
+ knowledge_graph_inst,
679
+ global_config,
680
+ pipeline_status,
681
+ pipeline_status_lock,
682
+ llm_response_cache,
683
+ )
684
+ relationships_data.append(edge_data)
685
+
686
+ # Update vector databases with all collected data
687
+ if entity_vdb is not None and entities_data:
688
+ data_for_vdb = {
689
+ compute_mdhash_id(dp["entity_name"], prefix="ent-"): {
690
+ "entity_name": dp["entity_name"],
691
+ "entity_type": dp["entity_type"],
692
+ "content": f"{dp['entity_name']}\n{dp['description']}",
693
+ "source_id": dp["source_id"],
694
+ "file_path": dp.get("file_path", "unknown_source"),
695
  }
696
+ for dp in entities_data
697
+ }
698
+ await entity_vdb.upsert(data_for_vdb)
699
+
700
+ if relationships_vdb is not None and relationships_data:
701
+ data_for_vdb = {
702
+ compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): {
703
+ "src_id": dp["src_id"],
704
+ "tgt_id": dp["tgt_id"],
705
+ "keywords": dp["keywords"],
706
+ "content": f"{dp['src_id']}\t{dp['tgt_id']}\n{dp['keywords']}\n{dp['description']}",
707
+ "source_id": dp["source_id"],
708
+ "file_path": dp.get("file_path", "unknown_source"),
709
  }
710
+ for dp in relationships_data
711
+ }
712
+ await relationships_vdb.upsert(data_for_vdb)
713
 
714
+ # Update total counts
715
+ total_entities_count = len(entities_data)
716
+ total_relations_count = len(relationships_data)
 
 
 
 
717
 
718
  log_message = f"Extracted {total_entities_count} entities + {total_relations_count} relationships (total)"
719
  logger.info(log_message)