Daniel.y commited on
Commit
4aaf020
·
unverified ·
2 Parent(s): b379f14 5ca3dfe

Merge pull request #1334 from danielaskdd/main

Browse files

Refactoring entity and edge merging and add env FORCE_LLM_SUMMARY_ON_MERGE

env.example CHANGED
@@ -43,11 +43,15 @@ WEBUI_DESCRIPTION="Simple and Fast Graph Based RAG System"
43
  SUMMARY_LANGUAGE=English
44
  # CHUNK_SIZE=1200
45
  # CHUNK_OVERLAP_SIZE=100
46
- ### Max tokens for entity or relations summary
47
- # MAX_TOKEN_SUMMARY=500
48
  ### Number of parallel processing documents in one patch
49
  # MAX_PARALLEL_INSERT=2
50
 
 
 
 
 
 
51
  ### Num of chunks send to Embedding in single request
52
  # EMBEDDING_BATCH_NUM=32
53
  ### Max concurrency requests for Embedding
 
43
  SUMMARY_LANGUAGE=English
44
  # CHUNK_SIZE=1200
45
  # CHUNK_OVERLAP_SIZE=100
46
+
 
47
  ### Number of parallel processing documents in one patch
48
  # MAX_PARALLEL_INSERT=2
49
 
50
+ ### Max tokens for entity/relations description after merge
51
+ # MAX_TOKEN_SUMMARY=500
52
+ ### Number of entities/edges to trigger LLM re-summary on merge ( at least 3 is recommented)
53
+ # FORCE_LLM_SUMMARY_ON_MERGE=6
54
+
55
  ### Num of chunks send to Embedding in single request
56
  # EMBEDDING_BATCH_NUM=32
57
  ### Max concurrency requests for Embedding
lightrag/api/__init__.py CHANGED
@@ -1 +1 @@
1
- __api_version__ = "0143"
 
1
+ __api_version__ = "0145"
lightrag/api/utils_api.py CHANGED
@@ -261,8 +261,12 @@ def display_splash_screen(args: argparse.Namespace) -> None:
261
  ASCIIColors.yellow(f"{args.chunk_overlap_size}")
262
  ASCIIColors.white(" ├─ Cosine Threshold: ", end="")
263
  ASCIIColors.yellow(f"{args.cosine_threshold}")
264
- ASCIIColors.white(" └─ Top-K: ", end="")
265
  ASCIIColors.yellow(f"{args.top_k}")
 
 
 
 
266
 
267
  # System Configuration
268
  ASCIIColors.magenta("\n💾 Storage Configuration:")
 
261
  ASCIIColors.yellow(f"{args.chunk_overlap_size}")
262
  ASCIIColors.white(" ├─ Cosine Threshold: ", end="")
263
  ASCIIColors.yellow(f"{args.cosine_threshold}")
264
+ ASCIIColors.white(" ├─ Top-K: ", end="")
265
  ASCIIColors.yellow(f"{args.top_k}")
266
+ ASCIIColors.white(" ├─ Max Token Summary: ", end="")
267
+ ASCIIColors.yellow(f"{int(os.getenv('MAX_TOKEN_SUMMARY', 500))}")
268
+ ASCIIColors.white(" └─ Force LLM Summary on Merge: ", end="")
269
+ ASCIIColors.yellow(f"{int(os.getenv('FORCE_LLM_SUMMARY_ON_MERGE', 6))}")
270
 
271
  # System Configuration
272
  ASCIIColors.magenta("\n💾 Storage Configuration:")
lightrag/api/webui/assets/{index-Cicy56pP.js → index-BPm_J2w3.js} RENAMED
Binary files a/lightrag/api/webui/assets/index-Cicy56pP.js and b/lightrag/api/webui/assets/index-BPm_J2w3.js differ
 
lightrag/api/webui/index.html CHANGED
Binary files a/lightrag/api/webui/index.html and b/lightrag/api/webui/index.html differ
 
lightrag/lightrag.py CHANGED
@@ -103,8 +103,10 @@ class LightRAG:
103
  entity_extract_max_gleaning: int = field(default=1)
104
  """Maximum number of entity extraction attempts for ambiguous content."""
105
 
106
- entity_summary_to_max_tokens: int = field(
107
- default=int(os.getenv("MAX_TOKEN_SUMMARY", 500))
 
 
108
  )
109
 
110
  # Text chunking
 
103
  entity_extract_max_gleaning: int = field(default=1)
104
  """Maximum number of entity extraction attempts for ambiguous content."""
105
 
106
+ summary_to_max_tokens: int = field(default=int(os.getenv("MAX_TOKEN_SUMMARY", 500)))
107
+
108
+ force_llm_summary_on_merge: int = field(
109
+ default=int(os.getenv("FORCE_LLM_SUMMARY_ON_MERGE", 6))
110
  )
111
 
112
  # Text chunking
lightrag/operate.py CHANGED
@@ -117,15 +117,13 @@ async def _handle_entity_relation_summary(
117
  use_llm_func: callable = global_config["llm_model_func"]
118
  llm_max_tokens = global_config["llm_model_max_token_size"]
119
  tiktoken_model_name = global_config["tiktoken_model_name"]
120
- summary_max_tokens = global_config["entity_summary_to_max_tokens"]
 
121
  language = global_config["addon_params"].get(
122
  "language", PROMPTS["DEFAULT_LANGUAGE"]
123
  )
124
 
125
  tokens = encode_string_by_tiktoken(description, model_name=tiktoken_model_name)
126
- if len(tokens) < summary_max_tokens: # No need for summary
127
- return description
128
-
129
  prompt_template = PROMPTS["summarize_entity_descriptions"]
130
  use_description = decode_tokens_by_tiktoken(
131
  tokens[:llm_max_tokens], model_name=tiktoken_model_name
@@ -138,14 +136,6 @@ async def _handle_entity_relation_summary(
138
  use_prompt = prompt_template.format(**context_base)
139
  logger.debug(f"Trigger summary: {entity_or_relation_name}")
140
 
141
- # Update pipeline status when LLM summary is needed
142
- status_message = "Use LLM to re-summary description..."
143
- logger.info(status_message)
144
- if pipeline_status is not None and pipeline_status_lock is not None:
145
- async with pipeline_status_lock:
146
- pipeline_status["latest_message"] = status_message
147
- pipeline_status["history_messages"].append(status_message)
148
-
149
  # Use LLM function with cache
150
  summary = await use_llm_func_with_cache(
151
  use_prompt,
@@ -244,14 +234,6 @@ async def _merge_nodes_then_upsert(
244
 
245
  already_node = await knowledge_graph_inst.get_node(entity_name)
246
  if already_node is not None:
247
- # Update pipeline status when a node that needs merging is found
248
- status_message = f"Merging entity: {entity_name}"
249
- logger.info(status_message)
250
- if pipeline_status is not None and pipeline_status_lock is not None:
251
- async with pipeline_status_lock:
252
- pipeline_status["latest_message"] = status_message
253
- pipeline_status["history_messages"].append(status_message)
254
-
255
  already_entity_types.append(already_node["entity_type"])
256
  already_source_ids.extend(
257
  split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP])
@@ -278,15 +260,35 @@ async def _merge_nodes_then_upsert(
278
  set([dp["file_path"] for dp in nodes_data] + already_file_paths)
279
  )
280
 
281
- logger.debug(f"file_path: {file_path}")
282
- description = await _handle_entity_relation_summary(
283
- entity_name,
284
- description,
285
- global_config,
286
- pipeline_status,
287
- pipeline_status_lock,
288
- llm_response_cache,
289
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
290
  node_data = dict(
291
  entity_id=entity_name,
292
  entity_type=entity_type,
@@ -319,14 +321,6 @@ async def _merge_edges_then_upsert(
319
  already_file_paths = []
320
 
321
  if await knowledge_graph_inst.has_edge(src_id, tgt_id):
322
- # Update pipeline status when an edge that needs merging is found
323
- status_message = f"Merging edge::: {src_id} - {tgt_id}"
324
- logger.info(status_message)
325
- if pipeline_status is not None and pipeline_status_lock is not None:
326
- async with pipeline_status_lock:
327
- pipeline_status["latest_message"] = status_message
328
- pipeline_status["history_messages"].append(status_message)
329
-
330
  already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id)
331
  # Handle the case where get_edge returns None or missing fields
332
  if already_edge:
@@ -404,14 +398,38 @@ async def _merge_edges_then_upsert(
404
  "file_path": file_path,
405
  },
406
  )
407
- description = await _handle_entity_relation_summary(
408
- f"({src_id}, {tgt_id})",
409
- description,
410
- global_config,
411
- pipeline_status,
412
- pipeline_status_lock,
413
- llm_response_cache,
414
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
415
  await knowledge_graph_inst.upsert_edge(
416
  src_id,
417
  tgt_id,
@@ -550,8 +568,10 @@ async def extract_entities(
550
  Args:
551
  chunk_key_dp (tuple[str, TextChunkSchema]):
552
  ("chunk-xxxxxx", {"tokens": int, "content": str, "full_doc_id": str, "chunk_order_index": int})
 
 
553
  """
554
- nonlocal processed_chunks, total_entities_count, total_relations_count
555
  chunk_key = chunk_key_dp[0]
556
  chunk_dp = chunk_key_dp[1]
557
  content = chunk_dp["content"]
@@ -623,75 +643,91 @@ async def extract_entities(
623
  pipeline_status["latest_message"] = log_message
624
  pipeline_status["history_messages"].append(log_message)
625
 
626
- # Use graph database lock to ensure atomic merges and updates
627
- chunk_entities_data = []
628
- chunk_relationships_data = []
629
-
630
- async with graph_db_lock:
631
- # Process and update entities
632
- for entity_name, entities in maybe_nodes.items():
633
- entity_data = await _merge_nodes_then_upsert(
634
- entity_name,
635
- entities,
636
- knowledge_graph_inst,
637
- global_config,
638
- pipeline_status,
639
- pipeline_status_lock,
640
- llm_response_cache,
641
- )
642
- chunk_entities_data.append(entity_data)
643
-
644
- # Process and update relationships
645
- for edge_key, edges in maybe_edges.items():
646
- # Ensure edge direction consistency
647
- sorted_edge_key = tuple(sorted(edge_key))
648
- edge_data = await _merge_edges_then_upsert(
649
- sorted_edge_key[0],
650
- sorted_edge_key[1],
651
- edges,
652
- knowledge_graph_inst,
653
- global_config,
654
- pipeline_status,
655
- pipeline_status_lock,
656
- llm_response_cache,
657
- )
658
- chunk_relationships_data.append(edge_data)
659
-
660
- # Update vector database (within the same lock to ensure atomicity)
661
- if entity_vdb is not None and chunk_entities_data:
662
- data_for_vdb = {
663
- compute_mdhash_id(dp["entity_name"], prefix="ent-"): {
664
- "entity_name": dp["entity_name"],
665
- "entity_type": dp["entity_type"],
666
- "content": f"{dp['entity_name']}\n{dp['description']}",
667
- "source_id": dp["source_id"],
668
- "file_path": dp.get("file_path", "unknown_source"),
669
- }
670
- for dp in chunk_entities_data
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
671
  }
672
- await entity_vdb.upsert(data_for_vdb)
673
-
674
- if relationships_vdb is not None and chunk_relationships_data:
675
- data_for_vdb = {
676
- compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): {
677
- "src_id": dp["src_id"],
678
- "tgt_id": dp["tgt_id"],
679
- "keywords": dp["keywords"],
680
- "content": f"{dp['src_id']}\t{dp['tgt_id']}\n{dp['keywords']}\n{dp['description']}",
681
- "source_id": dp["source_id"],
682
- "file_path": dp.get("file_path", "unknown_source"),
683
- }
684
- for dp in chunk_relationships_data
685
  }
686
- await relationships_vdb.upsert(data_for_vdb)
687
-
688
- # Update counters
689
- total_entities_count += len(chunk_entities_data)
690
- total_relations_count += len(chunk_relationships_data)
691
 
692
- # Handle all chunks in parallel
693
- tasks = [_process_single_content(c) for c in ordered_chunks]
694
- await asyncio.gather(*tasks)
695
 
696
  log_message = f"Extracted {total_entities_count} entities + {total_relations_count} relationships (total)"
697
  logger.info(log_message)
 
117
  use_llm_func: callable = global_config["llm_model_func"]
118
  llm_max_tokens = global_config["llm_model_max_token_size"]
119
  tiktoken_model_name = global_config["tiktoken_model_name"]
120
+ summary_max_tokens = global_config["summary_to_max_tokens"]
121
+
122
  language = global_config["addon_params"].get(
123
  "language", PROMPTS["DEFAULT_LANGUAGE"]
124
  )
125
 
126
  tokens = encode_string_by_tiktoken(description, model_name=tiktoken_model_name)
 
 
 
127
  prompt_template = PROMPTS["summarize_entity_descriptions"]
128
  use_description = decode_tokens_by_tiktoken(
129
  tokens[:llm_max_tokens], model_name=tiktoken_model_name
 
136
  use_prompt = prompt_template.format(**context_base)
137
  logger.debug(f"Trigger summary: {entity_or_relation_name}")
138
 
 
 
 
 
 
 
 
 
139
  # Use LLM function with cache
140
  summary = await use_llm_func_with_cache(
141
  use_prompt,
 
234
 
235
  already_node = await knowledge_graph_inst.get_node(entity_name)
236
  if already_node is not None:
 
 
 
 
 
 
 
 
237
  already_entity_types.append(already_node["entity_type"])
238
  already_source_ids.extend(
239
  split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP])
 
260
  set([dp["file_path"] for dp in nodes_data] + already_file_paths)
261
  )
262
 
263
+ force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
264
+
265
+ num_fragment = description.count(GRAPH_FIELD_SEP) + 1
266
+ num_new_fragment = len(set([dp["description"] for dp in nodes_data]))
267
+
268
+ if num_fragment > 1:
269
+ if num_fragment >= force_llm_summary_on_merge:
270
+ status_message = f"LLM merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}"
271
+ logger.info(status_message)
272
+ if pipeline_status is not None and pipeline_status_lock is not None:
273
+ async with pipeline_status_lock:
274
+ pipeline_status["latest_message"] = status_message
275
+ pipeline_status["history_messages"].append(status_message)
276
+ description = await _handle_entity_relation_summary(
277
+ entity_name,
278
+ description,
279
+ global_config,
280
+ pipeline_status,
281
+ pipeline_status_lock,
282
+ llm_response_cache,
283
+ )
284
+ else:
285
+ status_message = f"Merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}"
286
+ logger.info(status_message)
287
+ if pipeline_status is not None and pipeline_status_lock is not None:
288
+ async with pipeline_status_lock:
289
+ pipeline_status["latest_message"] = status_message
290
+ pipeline_status["history_messages"].append(status_message)
291
+
292
  node_data = dict(
293
  entity_id=entity_name,
294
  entity_type=entity_type,
 
321
  already_file_paths = []
322
 
323
  if await knowledge_graph_inst.has_edge(src_id, tgt_id):
 
 
 
 
 
 
 
 
324
  already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id)
325
  # Handle the case where get_edge returns None or missing fields
326
  if already_edge:
 
398
  "file_path": file_path,
399
  },
400
  )
401
+
402
+ force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
403
+
404
+ num_fragment = description.count(GRAPH_FIELD_SEP) + 1
405
+ num_new_fragment = len(
406
+ set([dp["description"] for dp in edges_data if dp.get("description")])
407
+ )
408
+
409
+ if num_fragment > 1:
410
+ if num_fragment >= force_llm_summary_on_merge:
411
+ status_message = f"LLM merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}"
412
+ logger.info(status_message)
413
+ if pipeline_status is not None and pipeline_status_lock is not None:
414
+ async with pipeline_status_lock:
415
+ pipeline_status["latest_message"] = status_message
416
+ pipeline_status["history_messages"].append(status_message)
417
+ description = await _handle_entity_relation_summary(
418
+ f"({src_id}, {tgt_id})",
419
+ description,
420
+ global_config,
421
+ pipeline_status,
422
+ pipeline_status_lock,
423
+ llm_response_cache,
424
+ )
425
+ else:
426
+ status_message = f"Merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}"
427
+ logger.info(status_message)
428
+ if pipeline_status is not None and pipeline_status_lock is not None:
429
+ async with pipeline_status_lock:
430
+ pipeline_status["latest_message"] = status_message
431
+ pipeline_status["history_messages"].append(status_message)
432
+
433
  await knowledge_graph_inst.upsert_edge(
434
  src_id,
435
  tgt_id,
 
568
  Args:
569
  chunk_key_dp (tuple[str, TextChunkSchema]):
570
  ("chunk-xxxxxx", {"tokens": int, "content": str, "full_doc_id": str, "chunk_order_index": int})
571
+ Returns:
572
+ tuple: (maybe_nodes, maybe_edges) containing extracted entities and relationships
573
  """
574
+ nonlocal processed_chunks
575
  chunk_key = chunk_key_dp[0]
576
  chunk_dp = chunk_key_dp[1]
577
  content = chunk_dp["content"]
 
643
  pipeline_status["latest_message"] = log_message
644
  pipeline_status["history_messages"].append(log_message)
645
 
646
+ # Return the extracted nodes and edges for centralized processing
647
+ return maybe_nodes, maybe_edges
648
+
649
+ # Handle all chunks in parallel and collect results
650
+ tasks = [_process_single_content(c) for c in ordered_chunks]
651
+ chunk_results = await asyncio.gather(*tasks)
652
+
653
+ # Collect all nodes and edges from all chunks
654
+ all_nodes = defaultdict(list)
655
+ all_edges = defaultdict(list)
656
+
657
+ for maybe_nodes, maybe_edges in chunk_results:
658
+ # Collect nodes
659
+ for entity_name, entities in maybe_nodes.items():
660
+ all_nodes[entity_name].extend(entities)
661
+
662
+ # Collect edges with sorted keys for undirected graph
663
+ for edge_key, edges in maybe_edges.items():
664
+ sorted_edge_key = tuple(sorted(edge_key))
665
+ all_edges[sorted_edge_key].extend(edges)
666
+
667
+ # Centralized processing of all nodes and edges
668
+ entities_data = []
669
+ relationships_data = []
670
+
671
+ # Use graph database lock to ensure atomic merges and updates
672
+ async with graph_db_lock:
673
+ # Process and update all entities at once
674
+ for entity_name, entities in all_nodes.items():
675
+ entity_data = await _merge_nodes_then_upsert(
676
+ entity_name,
677
+ entities,
678
+ knowledge_graph_inst,
679
+ global_config,
680
+ pipeline_status,
681
+ pipeline_status_lock,
682
+ llm_response_cache,
683
+ )
684
+ entities_data.append(entity_data)
685
+
686
+ # Process and update all relationships at once
687
+ for edge_key, edges in all_edges.items():
688
+ edge_data = await _merge_edges_then_upsert(
689
+ edge_key[0],
690
+ edge_key[1],
691
+ edges,
692
+ knowledge_graph_inst,
693
+ global_config,
694
+ pipeline_status,
695
+ pipeline_status_lock,
696
+ llm_response_cache,
697
+ )
698
+ relationships_data.append(edge_data)
699
+
700
+ # Update vector databases with all collected data
701
+ if entity_vdb is not None and entities_data:
702
+ data_for_vdb = {
703
+ compute_mdhash_id(dp["entity_name"], prefix="ent-"): {
704
+ "entity_name": dp["entity_name"],
705
+ "entity_type": dp["entity_type"],
706
+ "content": f"{dp['entity_name']}\n{dp['description']}",
707
+ "source_id": dp["source_id"],
708
+ "file_path": dp.get("file_path", "unknown_source"),
709
  }
710
+ for dp in entities_data
711
+ }
712
+ await entity_vdb.upsert(data_for_vdb)
713
+
714
+ if relationships_vdb is not None and relationships_data:
715
+ data_for_vdb = {
716
+ compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): {
717
+ "src_id": dp["src_id"],
718
+ "tgt_id": dp["tgt_id"],
719
+ "keywords": dp["keywords"],
720
+ "content": f"{dp['src_id']}\t{dp['tgt_id']}\n{dp['keywords']}\n{dp['description']}",
721
+ "source_id": dp["source_id"],
722
+ "file_path": dp.get("file_path", "unknown_source"),
723
  }
724
+ for dp in relationships_data
725
+ }
726
+ await relationships_vdb.upsert(data_for_vdb)
 
 
727
 
728
+ # Update total counts
729
+ total_entities_count = len(entities_data)
730
+ total_relations_count = len(relationships_data)
731
 
732
  log_message = f"Extracted {total_entities_count} entities + {total_relations_count} relationships (total)"
733
  logger.info(log_message)
lightrag/utils.py CHANGED
@@ -967,7 +967,7 @@ async def use_llm_func_with_cache(
967
  res: str = await use_llm_func(input_text, **kwargs)
968
 
969
  # Save to cache
970
- logger.info(f"Saving LLM cache for {arg_hash}")
971
  await save_to_cache(
972
  llm_response_cache,
973
  CacheData(
 
967
  res: str = await use_llm_func(input_text, **kwargs)
968
 
969
  # Save to cache
970
+ logger.info(f" == LLM cache == saving {arg_hash}")
971
  await save_to_cache(
972
  llm_response_cache,
973
  CacheData(
lightrag_webui/src/components/documents/PipelineStatusDialog.tsx CHANGED
@@ -166,7 +166,7 @@ export default function PipelineStatusDialog({
166
  {/* Latest Message */}
167
  <div className="space-y-2">
168
  <div className="text-sm font-medium">{t('documentPanel.pipelineStatus.latestMessage')}:</div>
169
- <div className="font-mono text-sm rounded-md bg-zinc-800 text-zinc-100 p-3">
170
  {status?.latest_message || '-'}
171
  </div>
172
  </div>
@@ -177,7 +177,7 @@ export default function PipelineStatusDialog({
177
  <div
178
  ref={historyRef}
179
  onScroll={handleScroll}
180
- className="font-mono text-sm rounded-md bg-zinc-800 text-zinc-100 p-3 overflow-y-auto min-h-[7.5em] max-h-[40vh]"
181
  >
182
  {status?.history_messages?.length ? (
183
  status.history_messages.map((msg, idx) => (
 
166
  {/* Latest Message */}
167
  <div className="space-y-2">
168
  <div className="text-sm font-medium">{t('documentPanel.pipelineStatus.latestMessage')}:</div>
169
+ <div className="font-mono text-xs rounded-md bg-zinc-800 text-zinc-100 p-3">
170
  {status?.latest_message || '-'}
171
  </div>
172
  </div>
 
177
  <div
178
  ref={historyRef}
179
  onScroll={handleScroll}
180
+ className="font-mono text-xs rounded-md bg-zinc-800 text-zinc-100 p-3 overflow-y-auto min-h-[7.5em] max-h-[40vh]"
181
  >
182
  {status?.history_messages?.length ? (
183
  status.history_messages.map((msg, idx) => (