zrguo commited on
Commit
a346fa4
·
1 Parent(s): d77ad04

update delete_by_doc_id

Browse files
lightrag/base.py CHANGED
@@ -278,6 +278,21 @@ class BaseKVStorage(StorageNameSpace, ABC):
278
  False: if the cache drop failed, or the cache mode is not supported
279
  """
280
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
281
 
282
  @dataclass
283
  class BaseGraphStorage(StorageNameSpace, ABC):
 
278
  False: if the cache drop failed, or the cache mode is not supported
279
  """
280
 
281
+ async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool:
282
+ """Delete specific cache records from storage by chunk IDs
283
+
284
+ Importance notes for in-memory storage:
285
+ 1. Changes will be persisted to disk during the next index_done_callback
286
+ 2. update flags to notify other processes that data persistence is needed
287
+
288
+ Args:
289
+ chunk_ids (list[str]): List of chunk IDs to be dropped from storage
290
+
291
+ Returns:
292
+ True: if the cache drop successfully
293
+ False: if the cache drop failed, or the operation is not supported
294
+ """
295
+
296
 
297
  @dataclass
298
  class BaseGraphStorage(StorageNameSpace, ABC):
lightrag/kg/json_kv_impl.py CHANGED
@@ -172,6 +172,53 @@ class JsonKVStorage(BaseKVStorage):
172
  except Exception:
173
  return False
174
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
175
  async def drop(self) -> dict[str, str]:
176
  """Drop all data from storage and clean up resources
177
  This action will persistent the data to disk immediately.
 
172
  except Exception:
173
  return False
174
 
175
+ async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool:
176
+ """Delete specific cache records from storage by chunk IDs
177
+
178
+ Importance notes for in-memory storage:
179
+ 1. Changes will be persisted to disk during the next index_done_callback
180
+ 2. update flags to notify other processes that data persistence is needed
181
+
182
+ Args:
183
+ chunk_ids (list[str]): List of chunk IDs to be dropped from storage
184
+
185
+ Returns:
186
+ True: if the cache drop successfully
187
+ False: if the cache drop failed
188
+ """
189
+ if not chunk_ids:
190
+ return False
191
+
192
+ try:
193
+ async with self._storage_lock:
194
+ # Iterate through all cache modes to find entries with matching chunk_ids
195
+ for mode_key, mode_data in list(self._data.items()):
196
+ if isinstance(mode_data, dict):
197
+ # Check each cached entry in this mode
198
+ for cache_key, cache_entry in list(mode_data.items()):
199
+ if (
200
+ isinstance(cache_entry, dict)
201
+ and cache_entry.get("chunk_id") in chunk_ids
202
+ ):
203
+ # Remove this cache entry
204
+ del mode_data[cache_key]
205
+ logger.debug(
206
+ f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}"
207
+ )
208
+
209
+ # If the mode is now empty, remove it entirely
210
+ if not mode_data:
211
+ del self._data[mode_key]
212
+
213
+ # Set update flags to notify persistence is needed
214
+ await set_all_update_flags(self.namespace)
215
+
216
+ logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs")
217
+ return True
218
+ except Exception as e:
219
+ logger.error(f"Error clearing cache by chunk IDs: {e}")
220
+ return False
221
+
222
  async def drop(self) -> dict[str, str]:
223
  """Drop all data from storage and clean up resources
224
  This action will persistent the data to disk immediately.
lightrag/lightrag.py CHANGED
@@ -56,6 +56,7 @@ from .operate import (
56
  kg_query,
57
  naive_query,
58
  query_with_keywords,
 
59
  )
60
  from .prompt import GRAPH_FIELD_SEP
61
  from .utils import (
@@ -1207,6 +1208,7 @@ class LightRAG:
1207
  cast(StorageNameSpace, storage_inst).index_done_callback()
1208
  for storage_inst in [ # type: ignore
1209
  self.full_docs,
 
1210
  self.text_chunks,
1211
  self.llm_response_cache,
1212
  self.entities_vdb,
@@ -1674,10 +1676,12 @@ class LightRAG:
1674
  # Return the dictionary containing statuses only for the found document IDs
1675
  return found_statuses
1676
 
1677
- # TODO: Deprecated (Deleting documents can cause hallucinations in RAG.)
1678
- # Document delete is not working properly for most of the storage implementations.
1679
  async def adelete_by_doc_id(self, doc_id: str) -> None:
1680
- """Delete a document and all its related data
 
 
 
 
1681
 
1682
  Args:
1683
  doc_id: Document ID to delete
@@ -1688,10 +1692,9 @@ class LightRAG:
1688
  logger.warning(f"Document {doc_id} not found")
1689
  return
1690
 
1691
- logger.debug(f"Starting deletion for document {doc_id}")
1692
 
1693
  # 2. Get all chunks related to this document
1694
- # Find all chunks where full_doc_id equals the current doc_id
1695
  all_chunks = await self.text_chunks.get_all()
1696
  related_chunks = {
1697
  chunk_id: chunk_data
@@ -1704,64 +1707,46 @@ class LightRAG:
1704
  logger.warning(f"No chunks found for document {doc_id}")
1705
  return
1706
 
1707
- # Get all related chunk IDs
1708
  chunk_ids = set(related_chunks.keys())
1709
- logger.debug(f"Found {len(chunk_ids)} chunks to delete")
1710
-
1711
- # TODO: self.entities_vdb.client_storage only works for local storage, need to fix this
1712
-
1713
- # 3. Before deleting, check the related entities and relationships for these chunks
1714
- for chunk_id in chunk_ids:
1715
- # Check entities
1716
- entities_storage = await self.entities_vdb.client_storage
1717
- entities = [
1718
- dp
1719
- for dp in entities_storage["data"]
1720
- if chunk_id in dp.get("source_id")
1721
- ]
1722
- logger.debug(f"Chunk {chunk_id} has {len(entities)} related entities")
1723
-
1724
- # Check relationships
1725
- relationships_storage = await self.relationships_vdb.client_storage
1726
- relations = [
1727
- dp
1728
- for dp in relationships_storage["data"]
1729
- if chunk_id in dp.get("source_id")
1730
- ]
1731
- logger.debug(f"Chunk {chunk_id} has {len(relations)} related relations")
1732
-
1733
- # Continue with the original deletion process...
1734
 
1735
- # 4. Delete chunks from vector database
1736
- if chunk_ids:
1737
- await self.chunks_vdb.delete(chunk_ids)
1738
- await self.text_chunks.delete(chunk_ids)
 
 
 
 
 
 
 
1739
 
1740
- # 5. Find and process entities and relationships that have these chunks as source
1741
- # Get all nodes and edges from the graph storage using storage-agnostic methods
1742
  entities_to_delete = set()
1743
- entities_to_update = {} # entity_name -> new_source_id
1744
  relationships_to_delete = set()
1745
- relationships_to_update = {} # (src, tgt) -> new_source_id
1746
 
1747
- # Process entities - use storage-agnostic methods
1748
  all_labels = await self.chunk_entity_relation_graph.get_all_labels()
1749
  for node_label in all_labels:
1750
  node_data = await self.chunk_entity_relation_graph.get_node(node_label)
1751
  if node_data and "source_id" in node_data:
1752
  # Split source_id using GRAPH_FIELD_SEP
1753
  sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
1754
- sources.difference_update(chunk_ids)
1755
- if not sources:
 
1756
  entities_to_delete.add(node_label)
1757
  logger.debug(
1758
  f"Entity {node_label} marked for deletion - no remaining sources"
1759
  )
1760
- else:
1761
- new_source_id = GRAPH_FIELD_SEP.join(sources)
1762
- entities_to_update[node_label] = new_source_id
1763
  logger.debug(
1764
- f"Entity {node_label} will be updated with new source_id: {new_source_id}"
1765
  )
1766
 
1767
  # Process relationships
@@ -1777,160 +1762,92 @@ class LightRAG:
1777
  if edge_data and "source_id" in edge_data:
1778
  # Split source_id using GRAPH_FIELD_SEP
1779
  sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
1780
- sources.difference_update(chunk_ids)
1781
- if not sources:
 
1782
  relationships_to_delete.add((src, tgt))
1783
  logger.debug(
1784
  f"Relationship {src}-{tgt} marked for deletion - no remaining sources"
1785
  )
1786
- else:
1787
- new_source_id = GRAPH_FIELD_SEP.join(sources)
1788
- relationships_to_update[(src, tgt)] = new_source_id
1789
  logger.debug(
1790
- f"Relationship {src}-{tgt} will be updated with new source_id: {new_source_id}"
1791
  )
1792
 
1793
- # Delete entities
 
 
 
 
 
 
1794
  if entities_to_delete:
1795
- for entity in entities_to_delete:
1796
- await self.entities_vdb.delete_entity(entity)
1797
- logger.debug(f"Deleted entity {entity} from vector DB")
 
 
 
 
 
1798
  await self.chunk_entity_relation_graph.remove_nodes(
1799
  list(entities_to_delete)
1800
  )
1801
- logger.debug(f"Deleted {len(entities_to_delete)} entities from graph")
1802
-
1803
- # Update entities
1804
- for entity, new_source_id in entities_to_update.items():
1805
- node_data = await self.chunk_entity_relation_graph.get_node(entity)
1806
- if node_data:
1807
- node_data["source_id"] = new_source_id
1808
- await self.chunk_entity_relation_graph.upsert_node(
1809
- entity, node_data
1810
- )
1811
- logger.debug(
1812
- f"Updated entity {entity} with new source_id: {new_source_id}"
1813
- )
1814
 
1815
- # Delete relationships
1816
  if relationships_to_delete:
 
 
1817
  for src, tgt in relationships_to_delete:
1818
- rel_id_0 = compute_mdhash_id(src + tgt, prefix="rel-")
1819
- rel_id_1 = compute_mdhash_id(tgt + src, prefix="rel-")
1820
- await self.relationships_vdb.delete([rel_id_0, rel_id_1])
1821
- logger.debug(f"Deleted relationship {src}-{tgt} from vector DB")
 
 
 
 
 
1822
  await self.chunk_entity_relation_graph.remove_edges(
1823
  list(relationships_to_delete)
1824
  )
1825
- logger.debug(
1826
- f"Deleted {len(relationships_to_delete)} relationships from graph"
1827
- )
1828
 
1829
- # Update relationships
1830
- for (src, tgt), new_source_id in relationships_to_update.items():
1831
- edge_data = await self.chunk_entity_relation_graph.get_edge(src, tgt)
1832
- if edge_data:
1833
- edge_data["source_id"] = new_source_id
1834
- await self.chunk_entity_relation_graph.upsert_edge(
1835
- src, tgt, edge_data
1836
- )
1837
- logger.debug(
1838
- f"Updated relationship {src}-{tgt} with new source_id: {new_source_id}"
1839
- )
 
 
 
 
1840
 
1841
- # 6. Delete original document and status
1842
  await self.full_docs.delete([doc_id])
1843
  await self.doc_status.delete([doc_id])
1844
 
1845
- # 7. Ensure all indexes are updated
1846
  await self._insert_done()
1847
 
1848
  logger.info(
1849
- f"Successfully deleted document {doc_id} and related data. "
1850
- f"Deleted {len(entities_to_delete)} entities and {len(relationships_to_delete)} relationships. "
1851
- f"Updated {len(entities_to_update)} entities and {len(relationships_to_update)} relationships."
1852
  )
1853
 
1854
- async def process_data(data_type, vdb, chunk_id):
1855
- # Check data (entities or relationships)
1856
- storage = await vdb.client_storage
1857
- data_with_chunk = [
1858
- dp
1859
- for dp in storage["data"]
1860
- if chunk_id in (dp.get("source_id") or "").split(GRAPH_FIELD_SEP)
1861
- ]
1862
-
1863
- data_for_vdb = {}
1864
- if data_with_chunk:
1865
- logger.warning(
1866
- f"found {len(data_with_chunk)} {data_type} still referencing chunk {chunk_id}"
1867
- )
1868
-
1869
- for item in data_with_chunk:
1870
- old_sources = item["source_id"].split(GRAPH_FIELD_SEP)
1871
- new_sources = [src for src in old_sources if src != chunk_id]
1872
-
1873
- if not new_sources:
1874
- logger.info(
1875
- f"{data_type} {item.get('entity_name', 'N/A')} is deleted because source_id is not exists"
1876
- )
1877
- await vdb.delete_entity(item)
1878
- else:
1879
- item["source_id"] = GRAPH_FIELD_SEP.join(new_sources)
1880
- item_id = item["__id__"]
1881
- data_for_vdb[item_id] = item.copy()
1882
- if data_type == "entities":
1883
- data_for_vdb[item_id]["content"] = data_for_vdb[
1884
- item_id
1885
- ].get("content") or (
1886
- item.get("entity_name", "")
1887
- + (item.get("description") or "")
1888
- )
1889
- else: # relationships
1890
- data_for_vdb[item_id]["content"] = data_for_vdb[
1891
- item_id
1892
- ].get("content") or (
1893
- (item.get("keywords") or "")
1894
- + (item.get("src_id") or "")
1895
- + (item.get("tgt_id") or "")
1896
- + (item.get("description") or "")
1897
- )
1898
-
1899
- if data_for_vdb:
1900
- await vdb.upsert(data_for_vdb)
1901
- logger.info(f"Successfully updated {data_type} in vector DB")
1902
-
1903
- # Add verification step
1904
- async def verify_deletion():
1905
- # Verify if the document has been deleted
1906
- if await self.full_docs.get_by_id(doc_id):
1907
- logger.warning(f"Document {doc_id} still exists in full_docs")
1908
-
1909
- # Verify if chunks have been deleted
1910
- all_remaining_chunks = await self.text_chunks.get_all()
1911
- remaining_related_chunks = {
1912
- chunk_id: chunk_data
1913
- for chunk_id, chunk_data in all_remaining_chunks.items()
1914
- if isinstance(chunk_data, dict)
1915
- and chunk_data.get("full_doc_id") == doc_id
1916
- }
1917
-
1918
- if remaining_related_chunks:
1919
- logger.warning(
1920
- f"Found {len(remaining_related_chunks)} remaining chunks"
1921
- )
1922
-
1923
- # Verify entities and relationships
1924
- for chunk_id in chunk_ids:
1925
- await process_data("entities", self.entities_vdb, chunk_id)
1926
- await process_data(
1927
- "relationships", self.relationships_vdb, chunk_id
1928
- )
1929
-
1930
- await verify_deletion()
1931
-
1932
  except Exception as e:
1933
  logger.error(f"Error while deleting document {doc_id}: {e}")
 
1934
 
1935
  async def adelete_by_entity(self, entity_name: str) -> None:
1936
  """Asynchronously delete an entity and all its relationships.
 
56
  kg_query,
57
  naive_query,
58
  query_with_keywords,
59
+ _rebuild_knowledge_from_chunks,
60
  )
61
  from .prompt import GRAPH_FIELD_SEP
62
  from .utils import (
 
1208
  cast(StorageNameSpace, storage_inst).index_done_callback()
1209
  for storage_inst in [ # type: ignore
1210
  self.full_docs,
1211
+ self.doc_status,
1212
  self.text_chunks,
1213
  self.llm_response_cache,
1214
  self.entities_vdb,
 
1676
  # Return the dictionary containing statuses only for the found document IDs
1677
  return found_statuses
1678
 
 
 
1679
  async def adelete_by_doc_id(self, doc_id: str) -> None:
1680
+ """Delete a document and all its related data with cache cleanup and reconstruction
1681
+
1682
+ Optimized version that:
1683
+ 1. Clears LLM cache for related chunks
1684
+ 2. Rebuilds entity and relationship descriptions from remaining chunks
1685
 
1686
  Args:
1687
  doc_id: Document ID to delete
 
1692
  logger.warning(f"Document {doc_id} not found")
1693
  return
1694
 
1695
+ logger.info(f"Starting optimized deletion for document {doc_id}")
1696
 
1697
  # 2. Get all chunks related to this document
 
1698
  all_chunks = await self.text_chunks.get_all()
1699
  related_chunks = {
1700
  chunk_id: chunk_data
 
1707
  logger.warning(f"No chunks found for document {doc_id}")
1708
  return
1709
 
 
1710
  chunk_ids = set(related_chunks.keys())
1711
+ logger.info(f"Found {len(chunk_ids)} chunks to delete")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1712
 
1713
+ # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks
1714
+ logger.info("Clearing LLM cache for related chunks...")
1715
+ cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids(
1716
+ list(chunk_ids)
1717
+ )
1718
+ if cache_cleared:
1719
+ logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks")
1720
+ else:
1721
+ logger.warning(
1722
+ "Failed to clear chunk cache or cache clearing not supported"
1723
+ )
1724
 
1725
+ # 4. Analyze entities and relationships that will be affected
 
1726
  entities_to_delete = set()
1727
+ entities_to_rebuild = {} # entity_name -> remaining_chunk_ids
1728
  relationships_to_delete = set()
1729
+ relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids
1730
 
1731
+ # Process entities
1732
  all_labels = await self.chunk_entity_relation_graph.get_all_labels()
1733
  for node_label in all_labels:
1734
  node_data = await self.chunk_entity_relation_graph.get_node(node_label)
1735
  if node_data and "source_id" in node_data:
1736
  # Split source_id using GRAPH_FIELD_SEP
1737
  sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
1738
+ remaining_sources = sources - chunk_ids
1739
+
1740
+ if not remaining_sources:
1741
  entities_to_delete.add(node_label)
1742
  logger.debug(
1743
  f"Entity {node_label} marked for deletion - no remaining sources"
1744
  )
1745
+ elif remaining_sources != sources:
1746
+ # Entity needs to be rebuilt from remaining chunks
1747
+ entities_to_rebuild[node_label] = remaining_sources
1748
  logger.debug(
1749
+ f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks"
1750
  )
1751
 
1752
  # Process relationships
 
1762
  if edge_data and "source_id" in edge_data:
1763
  # Split source_id using GRAPH_FIELD_SEP
1764
  sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
1765
+ remaining_sources = sources - chunk_ids
1766
+
1767
+ if not remaining_sources:
1768
  relationships_to_delete.add((src, tgt))
1769
  logger.debug(
1770
  f"Relationship {src}-{tgt} marked for deletion - no remaining sources"
1771
  )
1772
+ elif remaining_sources != sources:
1773
+ # Relationship needs to be rebuilt from remaining chunks
1774
+ relationships_to_rebuild[(src, tgt)] = remaining_sources
1775
  logger.debug(
1776
+ f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks"
1777
  )
1778
 
1779
+ # 5. Delete chunks from storage
1780
+ if chunk_ids:
1781
+ await self.chunks_vdb.delete(chunk_ids)
1782
+ await self.text_chunks.delete(chunk_ids)
1783
+ logger.info(f"Deleted {len(chunk_ids)} chunks from storage")
1784
+
1785
+ # 6. Delete entities that have no remaining sources
1786
  if entities_to_delete:
1787
+ # Delete from vector database
1788
+ entity_vdb_ids = [
1789
+ compute_mdhash_id(entity, prefix="ent-")
1790
+ for entity in entities_to_delete
1791
+ ]
1792
+ await self.entities_vdb.delete(entity_vdb_ids)
1793
+
1794
+ # Delete from graph
1795
  await self.chunk_entity_relation_graph.remove_nodes(
1796
  list(entities_to_delete)
1797
  )
1798
+ logger.info(f"Deleted {len(entities_to_delete)} entities")
 
 
 
 
 
 
 
 
 
 
 
 
1799
 
1800
+ # 7. Delete relationships that have no remaining sources
1801
  if relationships_to_delete:
1802
+ # Delete from vector database
1803
+ rel_ids_to_delete = []
1804
  for src, tgt in relationships_to_delete:
1805
+ rel_ids_to_delete.extend(
1806
+ [
1807
+ compute_mdhash_id(src + tgt, prefix="rel-"),
1808
+ compute_mdhash_id(tgt + src, prefix="rel-"),
1809
+ ]
1810
+ )
1811
+ await self.relationships_vdb.delete(rel_ids_to_delete)
1812
+
1813
+ # Delete from graph
1814
  await self.chunk_entity_relation_graph.remove_edges(
1815
  list(relationships_to_delete)
1816
  )
1817
+ logger.info(f"Deleted {len(relationships_to_delete)} relationships")
 
 
1818
 
1819
+ # 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks
1820
+ if entities_to_rebuild or relationships_to_rebuild:
1821
+ logger.info(
1822
+ f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..."
1823
+ )
1824
+ await _rebuild_knowledge_from_chunks(
1825
+ entities_to_rebuild=entities_to_rebuild,
1826
+ relationships_to_rebuild=relationships_to_rebuild,
1827
+ knowledge_graph_inst=self.chunk_entity_relation_graph,
1828
+ entities_vdb=self.entities_vdb,
1829
+ relationships_vdb=self.relationships_vdb,
1830
+ text_chunks=self.text_chunks,
1831
+ llm_response_cache=self.llm_response_cache,
1832
+ global_config=asdict(self),
1833
+ )
1834
 
1835
+ # 9. Delete original document and status
1836
  await self.full_docs.delete([doc_id])
1837
  await self.doc_status.delete([doc_id])
1838
 
1839
+ # 10. Ensure all indexes are updated
1840
  await self._insert_done()
1841
 
1842
  logger.info(
1843
+ f"Successfully deleted document {doc_id}. "
1844
+ f"Deleted: {len(entities_to_delete)} entities, {len(relationships_to_delete)} relationships. "
1845
+ f"Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} relationships."
1846
  )
1847
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1848
  except Exception as e:
1849
  logger.error(f"Error while deleting document {doc_id}: {e}")
1850
+ raise
1851
 
1852
  async def adelete_by_entity(self, entity_name: str) -> None:
1853
  """Asynchronously delete an entity and all its relationships.
lightrag/operate.py CHANGED
@@ -240,6 +240,421 @@ async def _handle_single_relationship_extraction(
240
  )
241
 
242
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
243
  async def _merge_nodes_then_upsert(
244
  entity_name: str,
245
  nodes_data: list[dict],
@@ -757,6 +1172,7 @@ async def extract_entities(
757
  use_llm_func,
758
  llm_response_cache=llm_response_cache,
759
  cache_type="extract",
 
760
  )
761
  history = pack_user_ass_to_openai_messages(hint_prompt, final_result)
762
 
@@ -773,6 +1189,7 @@ async def extract_entities(
773
  llm_response_cache=llm_response_cache,
774
  history_messages=history,
775
  cache_type="extract",
 
776
  )
777
 
778
  history += pack_user_ass_to_openai_messages(continue_prompt, glean_result)
 
240
  )
241
 
242
 
243
+ async def _rebuild_knowledge_from_chunks(
244
+ entities_to_rebuild: dict[str, set[str]],
245
+ relationships_to_rebuild: dict[tuple[str, str], set[str]],
246
+ knowledge_graph_inst: BaseGraphStorage,
247
+ entities_vdb: BaseVectorStorage,
248
+ relationships_vdb: BaseVectorStorage,
249
+ text_chunks: BaseKVStorage,
250
+ llm_response_cache: BaseKVStorage,
251
+ global_config: dict[str, str],
252
+ ) -> None:
253
+ """Rebuild entity and relationship descriptions from cached extraction results
254
+
255
+ This method uses cached LLM extraction results instead of calling LLM again,
256
+ following the same approach as the insert process.
257
+
258
+ Args:
259
+ entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids
260
+ relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids
261
+ """
262
+ if not entities_to_rebuild and not relationships_to_rebuild:
263
+ return
264
+
265
+ # Get all referenced chunk IDs
266
+ all_referenced_chunk_ids = set()
267
+ for chunk_ids in entities_to_rebuild.values():
268
+ all_referenced_chunk_ids.update(chunk_ids)
269
+ for chunk_ids in relationships_to_rebuild.values():
270
+ all_referenced_chunk_ids.update(chunk_ids)
271
+
272
+ logger.info(
273
+ f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions"
274
+ )
275
+
276
+ # Get cached extraction results for these chunks
277
+ cached_results = await _get_cached_extraction_results(
278
+ llm_response_cache, all_referenced_chunk_ids
279
+ )
280
+
281
+ if not cached_results:
282
+ logger.warning("No cached extraction results found, cannot rebuild")
283
+ return
284
+
285
+ # Process cached results to get entities and relationships for each chunk
286
+ chunk_entities = {} # chunk_id -> {entity_name: [entity_data]}
287
+ chunk_relationships = {} # chunk_id -> {(src, tgt): [relationship_data]}
288
+
289
+ for chunk_id, extraction_result in cached_results.items():
290
+ try:
291
+ entities, relationships = await _parse_extraction_result(
292
+ text_chunks=text_chunks,
293
+ extraction_result=extraction_result,
294
+ chunk_id=chunk_id,
295
+ )
296
+ chunk_entities[chunk_id] = entities
297
+ chunk_relationships[chunk_id] = relationships
298
+ except Exception as e:
299
+ logger.error(
300
+ f"Failed to parse cached extraction result for chunk {chunk_id}: {e}"
301
+ )
302
+ continue
303
+
304
+ # Rebuild entities
305
+ for entity_name, chunk_ids in entities_to_rebuild.items():
306
+ try:
307
+ await _rebuild_single_entity(
308
+ knowledge_graph_inst=knowledge_graph_inst,
309
+ entities_vdb=entities_vdb,
310
+ entity_name=entity_name,
311
+ chunk_ids=chunk_ids,
312
+ chunk_entities=chunk_entities,
313
+ llm_response_cache=llm_response_cache,
314
+ global_config=global_config,
315
+ )
316
+ logger.debug(
317
+ f"Rebuilt entity {entity_name} from {len(chunk_ids)} cached extractions"
318
+ )
319
+ except Exception as e:
320
+ logger.error(f"Failed to rebuild entity {entity_name}: {e}")
321
+
322
+ # Rebuild relationships
323
+ for (src, tgt), chunk_ids in relationships_to_rebuild.items():
324
+ try:
325
+ await _rebuild_single_relationship(
326
+ knowledge_graph_inst=knowledge_graph_inst,
327
+ relationships_vdb=relationships_vdb,
328
+ src=src,
329
+ tgt=tgt,
330
+ chunk_ids=chunk_ids,
331
+ chunk_relationships=chunk_relationships,
332
+ llm_response_cache=llm_response_cache,
333
+ global_config=global_config,
334
+ )
335
+ logger.debug(
336
+ f"Rebuilt relationship {src}-{tgt} from {len(chunk_ids)} cached extractions"
337
+ )
338
+ except Exception as e:
339
+ logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}")
340
+
341
+ logger.info("Completed rebuilding knowledge from cached extractions")
342
+
343
+
344
+ async def _get_cached_extraction_results(
345
+ llm_response_cache: BaseKVStorage, chunk_ids: set[str]
346
+ ) -> dict[str, str]:
347
+ """Get cached extraction results for specific chunk IDs
348
+
349
+ Args:
350
+ chunk_ids: Set of chunk IDs to get cached results for
351
+
352
+ Returns:
353
+ Dict mapping chunk_id -> extraction_result_text
354
+ """
355
+ cached_results = {}
356
+
357
+ # Get all cached data for "default" mode (entity extraction cache)
358
+ default_cache = await llm_response_cache.get_by_id("default") or {}
359
+
360
+ for cache_key, cache_entry in default_cache.items():
361
+ if (
362
+ isinstance(cache_entry, dict)
363
+ and cache_entry.get("cache_type") == "extract"
364
+ and cache_entry.get("chunk_id") in chunk_ids
365
+ ):
366
+ chunk_id = cache_entry["chunk_id"]
367
+ extraction_result = cache_entry["return"]
368
+ cached_results[chunk_id] = extraction_result
369
+
370
+ logger.info(
371
+ f"Found {len(cached_results)} cached extraction results for {len(chunk_ids)} chunk IDs"
372
+ )
373
+ return cached_results
374
+
375
+
376
+ async def _parse_extraction_result(
377
+ text_chunks: BaseKVStorage, extraction_result: str, chunk_id: str
378
+ ) -> tuple[dict, dict]:
379
+ """Parse cached extraction result using the same logic as extract_entities
380
+
381
+ Args:
382
+ extraction_result: The cached LLM extraction result
383
+ chunk_id: The chunk ID for source tracking
384
+
385
+ Returns:
386
+ Tuple of (entities_dict, relationships_dict)
387
+ """
388
+
389
+ # Get chunk data for file_path
390
+ chunk_data = await text_chunks.get_by_id(chunk_id)
391
+ file_path = (
392
+ chunk_data.get("file_path", "unknown_source")
393
+ if chunk_data
394
+ else "unknown_source"
395
+ )
396
+ context_base = dict(
397
+ tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
398
+ record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
399
+ completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
400
+ )
401
+ maybe_nodes = defaultdict(list)
402
+ maybe_edges = defaultdict(list)
403
+
404
+ # Parse the extraction result using the same logic as in extract_entities
405
+ records = split_string_by_multi_markers(
406
+ extraction_result,
407
+ [context_base["record_delimiter"], context_base["completion_delimiter"]],
408
+ )
409
+ for record in records:
410
+ record = re.search(r"\((.*)\)", record)
411
+ if record is None:
412
+ continue
413
+ record = record.group(1)
414
+ record_attributes = split_string_by_multi_markers(
415
+ record, [context_base["tuple_delimiter"]]
416
+ )
417
+
418
+ # Try to parse as entity
419
+ entity_data = await _handle_single_entity_extraction(
420
+ record_attributes, chunk_id, file_path
421
+ )
422
+ if entity_data is not None:
423
+ maybe_nodes[entity_data["entity_name"]].append(entity_data)
424
+ continue
425
+
426
+ # Try to parse as relationship
427
+ relationship_data = await _handle_single_relationship_extraction(
428
+ record_attributes, chunk_id, file_path
429
+ )
430
+ if relationship_data is not None:
431
+ maybe_edges[
432
+ (relationship_data["src_id"], relationship_data["tgt_id"])
433
+ ].append(relationship_data)
434
+
435
+ return dict(maybe_nodes), dict(maybe_edges)
436
+
437
+
438
+ async def _rebuild_single_entity(
439
+ knowledge_graph_inst: BaseGraphStorage,
440
+ entities_vdb: BaseVectorStorage,
441
+ entity_name: str,
442
+ chunk_ids: set[str],
443
+ chunk_entities: dict,
444
+ llm_response_cache: BaseKVStorage,
445
+ global_config: dict[str, str],
446
+ ) -> None:
447
+ """Rebuild a single entity from cached extraction results"""
448
+
449
+ # Get current entity data
450
+ current_entity = await knowledge_graph_inst.get_node(entity_name)
451
+ if not current_entity:
452
+ return
453
+
454
+ # Collect all entity data from relevant chunks
455
+ all_entity_data = []
456
+ for chunk_id in chunk_ids:
457
+ if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]:
458
+ all_entity_data.extend(chunk_entities[chunk_id][entity_name])
459
+
460
+ if not all_entity_data:
461
+ logger.warning(f"No cached entity data found for {entity_name}")
462
+ return
463
+
464
+ # Merge descriptions and get the most common entity type
465
+ descriptions = []
466
+ entity_types = []
467
+ file_paths = set()
468
+
469
+ for entity_data in all_entity_data:
470
+ if entity_data.get("description"):
471
+ descriptions.append(entity_data["description"])
472
+ if entity_data.get("entity_type"):
473
+ entity_types.append(entity_data["entity_type"])
474
+ if entity_data.get("file_path"):
475
+ file_paths.add(entity_data["file_path"])
476
+
477
+ # Combine all descriptions
478
+ combined_description = (
479
+ GRAPH_FIELD_SEP.join(descriptions)
480
+ if descriptions
481
+ else current_entity.get("description", "")
482
+ )
483
+
484
+ # Get most common entity type
485
+ entity_type = (
486
+ max(set(entity_types), key=entity_types.count)
487
+ if entity_types
488
+ else current_entity.get("entity_type", "UNKNOWN")
489
+ )
490
+
491
+ # Use summary if description is too long
492
+ if len(combined_description) > global_config["summary_to_max_tokens"]:
493
+ final_description = await _handle_entity_relation_summary(
494
+ entity_name,
495
+ combined_description,
496
+ global_config,
497
+ llm_response_cache=llm_response_cache,
498
+ )
499
+ else:
500
+ final_description = combined_description
501
+
502
+ # Update entity in graph storage
503
+ updated_entity_data = {
504
+ **current_entity,
505
+ "description": final_description,
506
+ "entity_type": entity_type,
507
+ "source_id": GRAPH_FIELD_SEP.join(chunk_ids),
508
+ "file_path": GRAPH_FIELD_SEP.join(file_paths)
509
+ if file_paths
510
+ else current_entity.get("file_path", "unknown_source"),
511
+ }
512
+ await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data)
513
+
514
+ # Update entity in vector database
515
+ entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-")
516
+
517
+ # Delete old vector record first
518
+ try:
519
+ await entities_vdb.delete([entity_vdb_id])
520
+ except Exception as e:
521
+ logger.debug(f"Could not delete old entity vector record {entity_vdb_id}: {e}")
522
+
523
+ # Insert new vector record
524
+ entity_content = f"{entity_name}\n{final_description}"
525
+ await entities_vdb.upsert(
526
+ {
527
+ entity_vdb_id: {
528
+ "content": entity_content,
529
+ "entity_name": entity_name,
530
+ "source_id": updated_entity_data["source_id"],
531
+ "description": final_description,
532
+ "entity_type": entity_type,
533
+ "file_path": updated_entity_data["file_path"],
534
+ }
535
+ }
536
+ )
537
+
538
+
539
+ async def _rebuild_single_relationship(
540
+ knowledge_graph_inst: BaseGraphStorage,
541
+ relationships_vdb: BaseVectorStorage,
542
+ src: str,
543
+ tgt: str,
544
+ chunk_ids: set[str],
545
+ chunk_relationships: dict,
546
+ llm_response_cache: BaseKVStorage,
547
+ global_config: dict[str, str],
548
+ ) -> None:
549
+ """Rebuild a single relationship from cached extraction results"""
550
+
551
+ # Get current relationship data
552
+ current_relationship = await knowledge_graph_inst.get_edge(src, tgt)
553
+ if not current_relationship:
554
+ return
555
+
556
+ # Collect all relationship data from relevant chunks
557
+ all_relationship_data = []
558
+ for chunk_id in chunk_ids:
559
+ if chunk_id in chunk_relationships:
560
+ # Check both (src, tgt) and (tgt, src) since relationships can be bidirectional
561
+ for edge_key in [(src, tgt), (tgt, src)]:
562
+ if edge_key in chunk_relationships[chunk_id]:
563
+ all_relationship_data.extend(
564
+ chunk_relationships[chunk_id][edge_key]
565
+ )
566
+
567
+ if not all_relationship_data:
568
+ logger.warning(f"No cached relationship data found for {src}-{tgt}")
569
+ return
570
+
571
+ # Merge descriptions and keywords
572
+ descriptions = []
573
+ keywords = []
574
+ weights = []
575
+ file_paths = set()
576
+
577
+ for rel_data in all_relationship_data:
578
+ if rel_data.get("description"):
579
+ descriptions.append(rel_data["description"])
580
+ if rel_data.get("keywords"):
581
+ keywords.append(rel_data["keywords"])
582
+ if rel_data.get("weight"):
583
+ weights.append(rel_data["weight"])
584
+ if rel_data.get("file_path"):
585
+ file_paths.add(rel_data["file_path"])
586
+
587
+ # Combine descriptions and keywords
588
+ combined_description = (
589
+ GRAPH_FIELD_SEP.join(descriptions)
590
+ if descriptions
591
+ else current_relationship.get("description", "")
592
+ )
593
+ combined_keywords = (
594
+ ", ".join(set(keywords))
595
+ if keywords
596
+ else current_relationship.get("keywords", "")
597
+ )
598
+ avg_weight = (
599
+ sum(weights) / len(weights)
600
+ if weights
601
+ else current_relationship.get("weight", 1.0)
602
+ )
603
+
604
+ # Use summary if description is too long
605
+ if len(combined_description) > global_config["summary_to_max_tokens"]:
606
+ final_description = await _handle_entity_relation_summary(
607
+ f"{src}-{tgt}",
608
+ combined_description,
609
+ global_config,
610
+ llm_response_cache=llm_response_cache,
611
+ )
612
+ else:
613
+ final_description = combined_description
614
+
615
+ # Update relationship in graph storage
616
+ updated_relationship_data = {
617
+ **current_relationship,
618
+ "description": final_description,
619
+ "keywords": combined_keywords,
620
+ "weight": avg_weight,
621
+ "source_id": GRAPH_FIELD_SEP.join(chunk_ids),
622
+ "file_path": GRAPH_FIELD_SEP.join(file_paths)
623
+ if file_paths
624
+ else current_relationship.get("file_path", "unknown_source"),
625
+ }
626
+ await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
627
+
628
+ # Update relationship in vector database
629
+ rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-")
630
+ rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-")
631
+
632
+ # Delete old vector records first (both directions to be safe)
633
+ try:
634
+ await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse])
635
+ except Exception as e:
636
+ logger.debug(
637
+ f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}"
638
+ )
639
+
640
+ # Insert new vector record
641
+ rel_content = f"{combined_keywords}\t{src}\n{tgt}\n{final_description}"
642
+ await relationships_vdb.upsert(
643
+ {
644
+ rel_vdb_id: {
645
+ "src_id": src,
646
+ "tgt_id": tgt,
647
+ "source_id": updated_relationship_data["source_id"],
648
+ "content": rel_content,
649
+ "keywords": combined_keywords,
650
+ "description": final_description,
651
+ "weight": avg_weight,
652
+ "file_path": updated_relationship_data["file_path"],
653
+ }
654
+ }
655
+ )
656
+
657
+
658
  async def _merge_nodes_then_upsert(
659
  entity_name: str,
660
  nodes_data: list[dict],
 
1172
  use_llm_func,
1173
  llm_response_cache=llm_response_cache,
1174
  cache_type="extract",
1175
+ chunk_id=chunk_key,
1176
  )
1177
  history = pack_user_ass_to_openai_messages(hint_prompt, final_result)
1178
 
 
1189
  llm_response_cache=llm_response_cache,
1190
  history_messages=history,
1191
  cache_type="extract",
1192
+ chunk_id=chunk_key,
1193
  )
1194
 
1195
  history += pack_user_ass_to_openai_messages(continue_prompt, glean_result)
lightrag/utils.py CHANGED
@@ -990,6 +990,7 @@ class CacheData:
990
  max_val: float | None = None
991
  mode: str = "default"
992
  cache_type: str = "query"
 
993
 
994
 
995
  async def save_to_cache(hashing_kv, cache_data: CacheData):
@@ -1030,6 +1031,7 @@ async def save_to_cache(hashing_kv, cache_data: CacheData):
1030
  mode_cache[cache_data.args_hash] = {
1031
  "return": cache_data.content,
1032
  "cache_type": cache_data.cache_type,
 
1033
  "embedding": cache_data.quantized.tobytes().hex()
1034
  if cache_data.quantized is not None
1035
  else None,
@@ -1534,6 +1536,7 @@ async def use_llm_func_with_cache(
1534
  max_tokens: int = None,
1535
  history_messages: list[dict[str, str]] = None,
1536
  cache_type: str = "extract",
 
1537
  ) -> str:
1538
  """Call LLM function with cache support
1539
 
@@ -1547,6 +1550,7 @@ async def use_llm_func_with_cache(
1547
  max_tokens: Maximum tokens for generation
1548
  history_messages: History messages list
1549
  cache_type: Type of cache
 
1550
 
1551
  Returns:
1552
  LLM response text
@@ -1589,6 +1593,7 @@ async def use_llm_func_with_cache(
1589
  content=res,
1590
  prompt=_prompt,
1591
  cache_type=cache_type,
 
1592
  ),
1593
  )
1594
 
 
990
  max_val: float | None = None
991
  mode: str = "default"
992
  cache_type: str = "query"
993
+ chunk_id: str | None = None
994
 
995
 
996
  async def save_to_cache(hashing_kv, cache_data: CacheData):
 
1031
  mode_cache[cache_data.args_hash] = {
1032
  "return": cache_data.content,
1033
  "cache_type": cache_data.cache_type,
1034
+ "chunk_id": cache_data.chunk_id if cache_data.chunk_id is not None else None,
1035
  "embedding": cache_data.quantized.tobytes().hex()
1036
  if cache_data.quantized is not None
1037
  else None,
 
1536
  max_tokens: int = None,
1537
  history_messages: list[dict[str, str]] = None,
1538
  cache_type: str = "extract",
1539
+ chunk_id: str | None = None,
1540
  ) -> str:
1541
  """Call LLM function with cache support
1542
 
 
1550
  max_tokens: Maximum tokens for generation
1551
  history_messages: History messages list
1552
  cache_type: Type of cache
1553
+ chunk_id: Chunk identifier to store in cache
1554
 
1555
  Returns:
1556
  LLM response text
 
1593
  content=res,
1594
  prompt=_prompt,
1595
  cache_type=cache_type,
1596
+ chunk_id=chunk_id,
1597
  ),
1598
  )
1599