Milin2025 commited on
Commit
bd11593
·
unverified ·
2 Parent(s): 294a44d b7d0d48

Merge branch 'HKUDS:main' into feat_login-jwt

Browse files
Files changed (2) hide show
  1. README.md +70 -0
  2. lightrag/lightrag.py +389 -0
README.md CHANGED
@@ -849,6 +849,76 @@ All operations are available in both synchronous and asynchronous versions. The
849
 
850
  These operations maintain data consistency across both the graph database and vector database components, ensuring your knowledge graph remains coherent.
851
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
852
  ## Cache
853
 
854
  <details>
 
849
 
850
  These operations maintain data consistency across both the graph database and vector database components, ensuring your knowledge graph remains coherent.
851
 
852
+ ## Entity Merging
853
+
854
+ <details>
855
+ <summary> <b>Merge Entities and Their Relationships</b> </summary>
856
+
857
+ LightRAG now supports merging multiple entities into a single entity, automatically handling all relationships:
858
+
859
+ ```python
860
+ # Basic entity merging
861
+ rag.merge_entities(
862
+ source_entities=["Artificial Intelligence", "AI", "Machine Intelligence"],
863
+ target_entity="AI Technology"
864
+ )
865
+ ```
866
+
867
+ With custom merge strategy:
868
+
869
+ ```python
870
+ # Define custom merge strategy for different fields
871
+ rag.merge_entities(
872
+ source_entities=["John Smith", "Dr. Smith", "J. Smith"],
873
+ target_entity="John Smith",
874
+ merge_strategy={
875
+ "description": "concatenate", # Combine all descriptions
876
+ "entity_type": "keep_first", # Keep the entity type from the first entity
877
+ "source_id": "join_unique" # Combine all unique source IDs
878
+ }
879
+ )
880
+ ```
881
+
882
+ With custom target entity data:
883
+
884
+ ```python
885
+ # Specify exact values for the merged entity
886
+ rag.merge_entities(
887
+ source_entities=["New York", "NYC", "Big Apple"],
888
+ target_entity="New York City",
889
+ target_entity_data={
890
+ "entity_type": "LOCATION",
891
+ "description": "New York City is the most populous city in the United States.",
892
+ }
893
+ )
894
+ ```
895
+
896
+ Advanced usage combining both approaches:
897
+
898
+ ```python
899
+ # Merge company entities with both strategy and custom data
900
+ rag.merge_entities(
901
+ source_entities=["Microsoft Corp", "Microsoft Corporation", "MSFT"],
902
+ target_entity="Microsoft",
903
+ merge_strategy={
904
+ "description": "concatenate", # Combine all descriptions
905
+ "source_id": "join_unique" # Combine source IDs
906
+ },
907
+ target_entity_data={
908
+ "entity_type": "ORGANIZATION",
909
+ }
910
+ )
911
+ ```
912
+
913
+ When merging entities:
914
+ * All relationships from source entities are redirected to the target entity
915
+ * Duplicate relationships are intelligently merged
916
+ * Self-relationships (loops) are prevented
917
+ * Source entities are removed after merging
918
+ * Relationship weights and attributes are preserved
919
+
920
+ </details>
921
+
922
  ## Cache
923
 
924
  <details>
lightrag/lightrag.py CHANGED
@@ -2420,3 +2420,392 @@ class LightRAG:
2420
  return loop.run_until_complete(
2421
  self.acreate_relation(source_entity, target_entity, relation_data)
2422
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2420
  return loop.run_until_complete(
2421
  self.acreate_relation(source_entity, target_entity, relation_data)
2422
  )
2423
+
2424
+ async def amerge_entities(
2425
+ self,
2426
+ source_entities: list[str],
2427
+ target_entity: str,
2428
+ merge_strategy: dict[str, str] = None,
2429
+ target_entity_data: dict[str, Any] = None,
2430
+ ) -> dict[str, Any]:
2431
+ """Asynchronously merge multiple entities into one entity.
2432
+
2433
+ Merges multiple source entities into a target entity, handling all relationships,
2434
+ and updating both the knowledge graph and vector database.
2435
+
2436
+ Args:
2437
+ source_entities: List of source entity names to merge
2438
+ target_entity: Name of the target entity after merging
2439
+ merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"}
2440
+ Supported strategies:
2441
+ - "concatenate": Concatenate all values (for text fields)
2442
+ - "keep_first": Keep the first non-empty value
2443
+ - "keep_last": Keep the last non-empty value
2444
+ - "join_unique": Join all unique values (for fields separated by delimiter)
2445
+ target_entity_data: Dictionary of specific values to set for the target entity,
2446
+ overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
2447
+
2448
+ Returns:
2449
+ Dictionary containing the merged entity information
2450
+ """
2451
+ try:
2452
+ # Default merge strategy
2453
+ default_strategy = {
2454
+ "description": "concatenate",
2455
+ "entity_type": "keep_first",
2456
+ "source_id": "join_unique",
2457
+ }
2458
+
2459
+ merge_strategy = (
2460
+ default_strategy
2461
+ if merge_strategy is None
2462
+ else {**default_strategy, **merge_strategy}
2463
+ )
2464
+ target_entity_data = (
2465
+ {} if target_entity_data is None else target_entity_data
2466
+ )
2467
+
2468
+ # 1. Check if all source entities exist
2469
+ source_entities_data = {}
2470
+ for entity_name in source_entities:
2471
+ node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
2472
+ if not node_data:
2473
+ raise ValueError(f"Source entity '{entity_name}' does not exist")
2474
+ source_entities_data[entity_name] = node_data
2475
+
2476
+ # 2. Check if target entity exists and get its data if it does
2477
+ target_exists = await self.chunk_entity_relation_graph.has_node(
2478
+ target_entity
2479
+ )
2480
+ target_entity_data = {}
2481
+ if target_exists:
2482
+ target_entity_data = await self.chunk_entity_relation_graph.get_node(
2483
+ target_entity
2484
+ )
2485
+ logger.info(
2486
+ f"Target entity '{target_entity}' already exists, will merge data"
2487
+ )
2488
+
2489
+ # 3. Merge entity data
2490
+ merged_entity_data = self._merge_entity_attributes(
2491
+ list(source_entities_data.values())
2492
+ + ([target_entity_data] if target_exists else []),
2493
+ merge_strategy,
2494
+ )
2495
+
2496
+ # Apply any explicitly provided target entity data (overrides merged data)
2497
+ for key, value in target_entity_data.items():
2498
+ merged_entity_data[key] = value
2499
+
2500
+ # 4. Get all relationships of the source entities
2501
+ all_relations = []
2502
+ for entity_name in source_entities:
2503
+ # Get all relationships where this entity is the source
2504
+ outgoing_edges = await self.chunk_entity_relation_graph.get_node_edges(
2505
+ entity_name
2506
+ )
2507
+ if outgoing_edges:
2508
+ for src, tgt in outgoing_edges:
2509
+ # Ensure src is the current entity
2510
+ if src == entity_name:
2511
+ edge_data = await self.chunk_entity_relation_graph.get_edge(
2512
+ src, tgt
2513
+ )
2514
+ all_relations.append(("outgoing", src, tgt, edge_data))
2515
+
2516
+ # Get all relationships where this entity is the target
2517
+ incoming_edges = []
2518
+ all_labels = await self.chunk_entity_relation_graph.get_all_labels()
2519
+ for label in all_labels:
2520
+ if label == entity_name:
2521
+ continue
2522
+ node_edges = await self.chunk_entity_relation_graph.get_node_edges(
2523
+ label
2524
+ )
2525
+ for src, tgt in node_edges or []:
2526
+ if tgt == entity_name:
2527
+ incoming_edges.append((src, tgt))
2528
+
2529
+ for src, tgt in incoming_edges:
2530
+ edge_data = await self.chunk_entity_relation_graph.get_edge(
2531
+ src, tgt
2532
+ )
2533
+ all_relations.append(("incoming", src, tgt, edge_data))
2534
+
2535
+ # 5. Create or update the target entity
2536
+ if not target_exists:
2537
+ await self.chunk_entity_relation_graph.upsert_node(
2538
+ target_entity, merged_entity_data
2539
+ )
2540
+ logger.info(f"Created new target entity '{target_entity}'")
2541
+ else:
2542
+ await self.chunk_entity_relation_graph.upsert_node(
2543
+ target_entity, merged_entity_data
2544
+ )
2545
+ logger.info(f"Updated existing target entity '{target_entity}'")
2546
+
2547
+ # 6. Recreate all relationships, pointing to the target entity
2548
+ relation_updates = {} # Track relationships that need to be merged
2549
+
2550
+ for rel_type, src, tgt, edge_data in all_relations:
2551
+ new_src = target_entity if src in source_entities else src
2552
+ new_tgt = target_entity if tgt in source_entities else tgt
2553
+
2554
+ # Skip relationships between source entities to avoid self-loops
2555
+ if new_src == new_tgt:
2556
+ logger.info(
2557
+ f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop"
2558
+ )
2559
+ continue
2560
+
2561
+ # Check if the same relationship already exists
2562
+ relation_key = f"{new_src}|{new_tgt}"
2563
+ if relation_key in relation_updates:
2564
+ # Merge relationship data
2565
+ existing_data = relation_updates[relation_key]["data"]
2566
+ merged_relation = self._merge_relation_attributes(
2567
+ [existing_data, edge_data],
2568
+ {
2569
+ "description": "concatenate",
2570
+ "keywords": "join_unique",
2571
+ "source_id": "join_unique",
2572
+ "weight": "max",
2573
+ },
2574
+ )
2575
+ relation_updates[relation_key]["data"] = merged_relation
2576
+ logger.info(
2577
+ f"Merged duplicate relationship: {new_src} -> {new_tgt}"
2578
+ )
2579
+ else:
2580
+ relation_updates[relation_key] = {
2581
+ "src": new_src,
2582
+ "tgt": new_tgt,
2583
+ "data": edge_data.copy(),
2584
+ }
2585
+
2586
+ # Apply relationship updates
2587
+ for rel_data in relation_updates.values():
2588
+ await self.chunk_entity_relation_graph.upsert_edge(
2589
+ rel_data["src"], rel_data["tgt"], rel_data["data"]
2590
+ )
2591
+ logger.info(
2592
+ f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}"
2593
+ )
2594
+
2595
+ # 7. Update entity vector representation
2596
+ description = merged_entity_data.get("description", "")
2597
+ source_id = merged_entity_data.get("source_id", "")
2598
+ entity_type = merged_entity_data.get("entity_type", "")
2599
+ content = target_entity + "\n" + description
2600
+
2601
+ entity_id = compute_mdhash_id(target_entity, prefix="ent-")
2602
+ entity_data_for_vdb = {
2603
+ entity_id: {
2604
+ "content": content,
2605
+ "entity_name": target_entity,
2606
+ "source_id": source_id,
2607
+ "description": description,
2608
+ "entity_type": entity_type,
2609
+ }
2610
+ }
2611
+
2612
+ await self.entities_vdb.upsert(entity_data_for_vdb)
2613
+
2614
+ # 8. Update relationship vector representations
2615
+ for rel_data in relation_updates.values():
2616
+ src = rel_data["src"]
2617
+ tgt = rel_data["tgt"]
2618
+ edge_data = rel_data["data"]
2619
+
2620
+ description = edge_data.get("description", "")
2621
+ keywords = edge_data.get("keywords", "")
2622
+ source_id = edge_data.get("source_id", "")
2623
+ weight = float(edge_data.get("weight", 1.0))
2624
+
2625
+ content = f"{keywords}\t{src}\n{tgt}\n{description}"
2626
+ relation_id = compute_mdhash_id(src + tgt, prefix="rel-")
2627
+
2628
+ relation_data_for_vdb = {
2629
+ relation_id: {
2630
+ "content": content,
2631
+ "src_id": src,
2632
+ "tgt_id": tgt,
2633
+ "source_id": source_id,
2634
+ "description": description,
2635
+ "keywords": keywords,
2636
+ "weight": weight,
2637
+ }
2638
+ }
2639
+
2640
+ await self.relationships_vdb.upsert(relation_data_for_vdb)
2641
+
2642
+ # 9. Delete source entities
2643
+ for entity_name in source_entities:
2644
+ # Delete entity node
2645
+ await self.chunk_entity_relation_graph.delete_node(entity_name)
2646
+ # Delete record from vector database
2647
+ entity_id = compute_mdhash_id(entity_name, prefix="ent-")
2648
+ await self.entities_vdb.delete([entity_id])
2649
+ logger.info(f"Deleted source entity '{entity_name}'")
2650
+
2651
+ # 10. Save changes
2652
+ await self._merge_entities_done()
2653
+
2654
+ logger.info(
2655
+ f"Successfully merged {len(source_entities)} entities into '{target_entity}'"
2656
+ )
2657
+ return await self.get_entity_info(target_entity, include_vector_data=True)
2658
+
2659
+ except Exception as e:
2660
+ logger.error(f"Error merging entities: {e}")
2661
+ raise
2662
+
2663
+ def merge_entities(
2664
+ self,
2665
+ source_entities: list[str],
2666
+ target_entity: str,
2667
+ merge_strategy: dict[str, str] = None,
2668
+ target_entity_data: dict[str, Any] = None,
2669
+ ) -> dict[str, Any]:
2670
+ """Synchronously merge multiple entities into one entity.
2671
+
2672
+ Merges multiple source entities into a target entity, handling all relationships,
2673
+ and updating both the knowledge graph and vector database.
2674
+
2675
+ Args:
2676
+ source_entities: List of source entity names to merge
2677
+ target_entity: Name of the target entity after merging
2678
+ merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"}
2679
+ target_entity_data: Dictionary of specific values to set for the target entity,
2680
+ overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
2681
+
2682
+ Returns:
2683
+ Dictionary containing the merged entity information
2684
+ """
2685
+ loop = always_get_an_event_loop()
2686
+ return loop.run_until_complete(
2687
+ self.amerge_entities(
2688
+ source_entities, target_entity, merge_strategy, target_entity_data
2689
+ )
2690
+ )
2691
+
2692
+ def _merge_entity_attributes(
2693
+ self, entity_data_list: list[dict[str, Any]], merge_strategy: dict[str, str]
2694
+ ) -> dict[str, Any]:
2695
+ """Merge attributes from multiple entities.
2696
+
2697
+ Args:
2698
+ entity_data_list: List of dictionaries containing entity data
2699
+ merge_strategy: Merge strategy for each field
2700
+
2701
+ Returns:
2702
+ Dictionary containing merged entity data
2703
+ """
2704
+ merged_data = {}
2705
+
2706
+ # Collect all possible keys
2707
+ all_keys = set()
2708
+ for data in entity_data_list:
2709
+ all_keys.update(data.keys())
2710
+
2711
+ # Merge values for each key
2712
+ for key in all_keys:
2713
+ # Get all values for this key
2714
+ values = [data.get(key) for data in entity_data_list if data.get(key)]
2715
+
2716
+ if not values:
2717
+ continue
2718
+
2719
+ # Merge values according to strategy
2720
+ strategy = merge_strategy.get(key, "keep_first")
2721
+
2722
+ if strategy == "concatenate":
2723
+ merged_data[key] = "\n\n".join(values)
2724
+ elif strategy == "keep_first":
2725
+ merged_data[key] = values[0]
2726
+ elif strategy == "keep_last":
2727
+ merged_data[key] = values[-1]
2728
+ elif strategy == "join_unique":
2729
+ # Handle fields separated by GRAPH_FIELD_SEP
2730
+ unique_items = set()
2731
+ for value in values:
2732
+ items = value.split(GRAPH_FIELD_SEP)
2733
+ unique_items.update(items)
2734
+ merged_data[key] = GRAPH_FIELD_SEP.join(unique_items)
2735
+ else:
2736
+ # Default strategy
2737
+ merged_data[key] = values[0]
2738
+
2739
+ return merged_data
2740
+
2741
+ def _merge_relation_attributes(
2742
+ self, relation_data_list: list[dict[str, Any]], merge_strategy: dict[str, str]
2743
+ ) -> dict[str, Any]:
2744
+ """Merge attributes from multiple relationships.
2745
+
2746
+ Args:
2747
+ relation_data_list: List of dictionaries containing relationship data
2748
+ merge_strategy: Merge strategy for each field
2749
+
2750
+ Returns:
2751
+ Dictionary containing merged relationship data
2752
+ """
2753
+ merged_data = {}
2754
+
2755
+ # Collect all possible keys
2756
+ all_keys = set()
2757
+ for data in relation_data_list:
2758
+ all_keys.update(data.keys())
2759
+
2760
+ # Merge values for each key
2761
+ for key in all_keys:
2762
+ # Get all values for this key
2763
+ values = [
2764
+ data.get(key)
2765
+ for data in relation_data_list
2766
+ if data.get(key) is not None
2767
+ ]
2768
+
2769
+ if not values:
2770
+ continue
2771
+
2772
+ # Merge values according to strategy
2773
+ strategy = merge_strategy.get(key, "keep_first")
2774
+
2775
+ if strategy == "concatenate":
2776
+ merged_data[key] = "\n\n".join(str(v) for v in values)
2777
+ elif strategy == "keep_first":
2778
+ merged_data[key] = values[0]
2779
+ elif strategy == "keep_last":
2780
+ merged_data[key] = values[-1]
2781
+ elif strategy == "join_unique":
2782
+ # Handle fields separated by GRAPH_FIELD_SEP
2783
+ unique_items = set()
2784
+ for value in values:
2785
+ items = str(value).split(GRAPH_FIELD_SEP)
2786
+ unique_items.update(items)
2787
+ merged_data[key] = GRAPH_FIELD_SEP.join(unique_items)
2788
+ elif strategy == "max":
2789
+ # For numeric fields like weight
2790
+ try:
2791
+ merged_data[key] = max(float(v) for v in values)
2792
+ except (ValueError, TypeError):
2793
+ merged_data[key] = values[0]
2794
+ else:
2795
+ # Default strategy
2796
+ merged_data[key] = values[0]
2797
+
2798
+ return merged_data
2799
+
2800
+ async def _merge_entities_done(self) -> None:
2801
+ """Callback after entity merging is complete, ensures updates are persisted"""
2802
+ await asyncio.gather(
2803
+ *[
2804
+ cast(StorageNameSpace, storage_inst).index_done_callback()
2805
+ for storage_inst in [ # type: ignore
2806
+ self.entities_vdb,
2807
+ self.relationships_vdb,
2808
+ self.chunk_entity_relation_graph,
2809
+ ]
2810
+ ]
2811
+ )