Merge pull request #1337 from danielaskdd/main
Browse filesOnly merge new entities/edges during gleaning stage
- lightrag/lightrag.py +7 -0
- lightrag/operate.py +10 -4
lightrag/lightrag.py
CHANGED
@@ -902,6 +902,13 @@ class LightRAG:
|
|
902 |
# Get file path from status document
|
903 |
file_path = getattr(status_doc, "file_path", "unknown_source")
|
904 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
905 |
# Generate chunks from document
|
906 |
chunks: dict[str, Any] = {
|
907 |
compute_mdhash_id(dp["content"], prefix="chunk-"): {
|
|
|
902 |
# Get file path from status document
|
903 |
file_path = getattr(status_doc, "file_path", "unknown_source")
|
904 |
|
905 |
+
async with pipeline_status_lock:
|
906 |
+
log_message = f"Processing file: {file_path}"
|
907 |
+
pipeline_status["history_messages"].append(log_message)
|
908 |
+
log_message = f"Processing d-id: {doc_id}"
|
909 |
+
pipeline_status["latest_message"] = log_message
|
910 |
+
pipeline_status["history_messages"].append(log_message)
|
911 |
+
|
912 |
# Generate chunks from document
|
913 |
chunks: dict[str, Any] = {
|
914 |
compute_mdhash_id(dp["content"], prefix="chunk-"): {
|
lightrag/operate.py
CHANGED
@@ -613,11 +613,17 @@ async def extract_entities(
|
|
613 |
glean_result, chunk_key, file_path
|
614 |
)
|
615 |
|
616 |
-
# Merge results
|
617 |
for entity_name, entities in glean_nodes.items():
|
618 |
-
|
|
|
|
|
|
|
619 |
for edge_key, edges in glean_edges.items():
|
620 |
-
|
|
|
|
|
|
|
621 |
|
622 |
if now_glean_index == entity_extract_max_gleaning - 1:
|
623 |
break
|
@@ -636,7 +642,7 @@ async def extract_entities(
|
|
636 |
processed_chunks += 1
|
637 |
entities_count = len(maybe_nodes)
|
638 |
relations_count = len(maybe_edges)
|
639 |
-
log_message = f"
|
640 |
logger.info(log_message)
|
641 |
if pipeline_status is not None:
|
642 |
async with pipeline_status_lock:
|
|
|
613 |
glean_result, chunk_key, file_path
|
614 |
)
|
615 |
|
616 |
+
# Merge results - only add entities and edges with new names
|
617 |
for entity_name, entities in glean_nodes.items():
|
618 |
+
if (
|
619 |
+
entity_name not in maybe_nodes
|
620 |
+
): # Only accetp entities with new name in gleaning stage
|
621 |
+
maybe_nodes[entity_name].extend(entities)
|
622 |
for edge_key, edges in glean_edges.items():
|
623 |
+
if (
|
624 |
+
edge_key not in maybe_edges
|
625 |
+
): # Only accetp edges with new name in gleaning stage
|
626 |
+
maybe_edges[edge_key].extend(edges)
|
627 |
|
628 |
if now_glean_index == entity_extract_max_gleaning - 1:
|
629 |
break
|
|
|
642 |
processed_chunks += 1
|
643 |
entities_count = len(maybe_nodes)
|
644 |
relations_count = len(maybe_edges)
|
645 |
+
log_message = f"Chk {processed_chunks}/{total_chunks}: extracted {entities_count} Ent + {relations_count} Rel (deduplicated)"
|
646 |
logger.info(log_message)
|
647 |
if pipeline_status is not None:
|
648 |
async with pipeline_status_lock:
|