gzdaniel commited on
Commit
73387c0
·
1 Parent(s): e425521

Add comprehensive error handling for document deletion

Browse files
Files changed (1) hide show
  1. lightrag/lightrag.py +186 -121
lightrag/lightrag.py CHANGED
@@ -1703,9 +1703,15 @@ class LightRAG:
1703
  - `message` (str): A summary of the operation's result.
1704
  - `status_code` (int): HTTP status code (e.g., 200, 404, 500).
1705
  """
 
 
 
1706
  try:
 
 
1707
  # 1. Get the document status and related data
1708
- if not await self.doc_status.get_by_id(doc_id):
 
1709
  logger.warning(f"Document {doc_id} not found")
1710
  return DeletionResult(
1711
  status="not_found",
@@ -1717,19 +1723,32 @@ class LightRAG:
1717
  logger.info(f"Starting optimized deletion for document {doc_id}")
1718
 
1719
  # 2. Get all chunks related to this document
1720
- all_chunks = await self.text_chunks.get_all()
1721
- related_chunks = {
1722
- chunk_id: chunk_data
1723
- for chunk_id, chunk_data in all_chunks.items()
1724
- if isinstance(chunk_data, dict)
1725
- and chunk_data.get("full_doc_id") == doc_id
1726
- }
 
 
 
 
 
1727
 
1728
  if not related_chunks:
1729
  logger.warning(f"No chunks found for document {doc_id}")
1730
- # Still need to delete the doc status and full doc
1731
- await self.full_docs.delete([doc_id])
1732
- await self.doc_status.delete([doc_id])
 
 
 
 
 
 
 
 
1733
  return DeletionResult(
1734
  status="success",
1735
  doc_id=doc_id,
@@ -1740,17 +1759,8 @@ class LightRAG:
1740
  chunk_ids = set(related_chunks.keys())
1741
  logger.info(f"Found {len(chunk_ids)} chunks to delete")
1742
 
1743
- # # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks
1744
- # logger.info("Clearing LLM cache for related chunks...")
1745
- # cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids(
1746
- # list(chunk_ids)
1747
- # )
1748
- # if cache_cleared:
1749
- # logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks")
1750
- # else:
1751
- # logger.warning(
1752
- # "Failed to clear chunk cache or cache clearing not supported"
1753
- # )
1754
 
1755
  # 4. Analyze entities and relationships that will be affected
1756
  entities_to_delete = set()
@@ -1761,122 +1771,151 @@ class LightRAG:
1761
  # Use graph database lock to ensure atomic merges and updates
1762
  graph_db_lock = get_graph_db_lock(enable_logging=False)
1763
  async with graph_db_lock:
1764
- # Get all affected nodes and edges in batch
1765
- affected_nodes = (
1766
- await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids(
1767
- list(chunk_ids)
 
 
 
1768
  )
1769
- )
1770
- affected_edges = (
1771
- await self.chunk_entity_relation_graph.get_edges_by_chunk_ids(
1772
- list(chunk_ids)
1773
  )
1774
- )
1775
-
1776
- # logger.info(f"chunk_ids: {chunk_ids}")
1777
- # logger.info(f"affected_nodes: {affected_nodes}")
1778
- # logger.info(f"affected_edges: {affected_edges}")
1779
-
1780
- # Process entities
1781
- for node_data in affected_nodes:
1782
- node_label = node_data.get("entity_id")
1783
- if node_label and "source_id" in node_data:
1784
- sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
1785
- remaining_sources = sources - chunk_ids
1786
-
1787
- if not remaining_sources:
1788
- entities_to_delete.add(node_label)
1789
- elif remaining_sources != sources:
1790
- entities_to_rebuild[node_label] = remaining_sources
1791
-
1792
- # Process relationships
1793
- for edge_data in affected_edges:
1794
- src = edge_data.get("source")
1795
- tgt = edge_data.get("target")
1796
-
1797
- if src and tgt and "source_id" in edge_data:
1798
- edge_tuple = tuple(sorted((src, tgt)))
1799
- if (
1800
- edge_tuple in relationships_to_delete
1801
- or edge_tuple in relationships_to_rebuild
1802
- ):
1803
- continue
1804
-
1805
- sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
1806
- remaining_sources = sources - chunk_ids
1807
-
1808
- if not remaining_sources:
1809
- relationships_to_delete.add(edge_tuple)
1810
- elif remaining_sources != sources:
1811
- relationships_to_rebuild[edge_tuple] = remaining_sources
 
 
 
 
 
 
 
 
1812
 
1813
  # 5. Delete chunks from storage
1814
  if chunk_ids:
1815
- await self.chunks_vdb.delete(chunk_ids)
1816
- await self.text_chunks.delete(chunk_ids)
1817
- logger.info(f"Deleted {len(chunk_ids)} chunks from storage")
 
 
 
 
 
1818
 
1819
  # 6. Delete entities that have no remaining sources
1820
  if entities_to_delete:
1821
- # Delete from vector database
1822
- entity_vdb_ids = [
1823
- compute_mdhash_id(entity, prefix="ent-")
1824
- for entity in entities_to_delete
1825
- ]
1826
- await self.entities_vdb.delete(entity_vdb_ids)
1827
-
1828
- # Delete from graph
1829
- await self.chunk_entity_relation_graph.remove_nodes(
1830
- list(entities_to_delete)
1831
- )
1832
- logger.info(f"Deleted {len(entities_to_delete)} entities")
 
 
 
 
 
1833
 
1834
  # 7. Delete relationships that have no remaining sources
1835
  if relationships_to_delete:
1836
- # Delete from vector database
1837
- rel_ids_to_delete = []
1838
- for src, tgt in relationships_to_delete:
1839
- rel_ids_to_delete.extend(
1840
- [
1841
- compute_mdhash_id(src + tgt, prefix="rel-"),
1842
- compute_mdhash_id(tgt + src, prefix="rel-"),
1843
- ]
1844
- )
1845
- await self.relationships_vdb.delete(rel_ids_to_delete)
 
 
1846
 
1847
- # Delete from graph
1848
- await self.chunk_entity_relation_graph.remove_edges(
1849
- list(relationships_to_delete)
1850
- )
1851
- logger.info(f"Deleted {len(relationships_to_delete)} relationships")
 
 
 
1852
 
1853
- # 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks
1854
  if entities_to_rebuild or relationships_to_rebuild:
1855
- logger.info(
1856
- f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..."
1857
- )
1858
- await _rebuild_knowledge_from_chunks(
1859
- entities_to_rebuild=entities_to_rebuild,
1860
- relationships_to_rebuild=relationships_to_rebuild,
1861
- knowledge_graph_inst=self.chunk_entity_relation_graph,
1862
- entities_vdb=self.entities_vdb,
1863
- relationships_vdb=self.relationships_vdb,
1864
- text_chunks=self.text_chunks,
1865
- llm_response_cache=self.llm_response_cache,
1866
- global_config=asdict(self),
1867
- )
 
 
 
1868
 
1869
  # 9. Delete original document and status
1870
- await self.full_docs.delete([doc_id])
1871
- await self.doc_status.delete([doc_id])
1872
-
1873
- # 10. Ensure all indexes are updated
1874
- await self._insert_done()
1875
-
1876
- success_message = f"""Successfully deleted document {doc_id}.
1877
- Deleted: {len(entities_to_delete)} entities, {len(relationships_to_delete)} relationships.
1878
- Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} relationships."""
1879
 
 
1880
  logger.info(success_message)
1881
  return DeletionResult(
1882
  status="success",
@@ -1886,6 +1925,7 @@ Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} re
1886
  )
1887
 
1888
  except Exception as e:
 
1889
  error_message = f"Error while deleting document {doc_id}: {e}"
1890
  logger.error(error_message)
1891
  logger.error(traceback.format_exc())
@@ -1895,6 +1935,31 @@ Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} re
1895
  message=error_message,
1896
  status_code=500,
1897
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1898
 
1899
  async def adelete_by_entity(self, entity_name: str) -> DeletionResult:
1900
  """Asynchronously delete an entity and all its relationships.
 
1703
  - `message` (str): A summary of the operation's result.
1704
  - `status_code` (int): HTTP status code (e.g., 200, 404, 500).
1705
  """
1706
+ deletion_operations_started = False
1707
+ original_exception = None
1708
+
1709
  try:
1710
+ logger.info(f"Starting deletion process for document {doc_id}")
1711
+
1712
  # 1. Get the document status and related data
1713
+ doc_status_data = await self.doc_status.get_by_id(doc_id)
1714
+ if not doc_status_data:
1715
  logger.warning(f"Document {doc_id} not found")
1716
  return DeletionResult(
1717
  status="not_found",
 
1723
  logger.info(f"Starting optimized deletion for document {doc_id}")
1724
 
1725
  # 2. Get all chunks related to this document
1726
+ try:
1727
+ all_chunks = await self.text_chunks.get_all()
1728
+ related_chunks = {
1729
+ chunk_id: chunk_data
1730
+ for chunk_id, chunk_data in all_chunks.items()
1731
+ if isinstance(chunk_data, dict)
1732
+ and chunk_data.get("full_doc_id") == doc_id
1733
+ }
1734
+ logger.info(f"Retrieved {len(all_chunks)} total chunks, {len(related_chunks)} related to document {doc_id}")
1735
+ except Exception as e:
1736
+ logger.error(f"Failed to retrieve chunks for document {doc_id}: {e}")
1737
+ raise Exception(f"Failed to retrieve document chunks: {e}") from e
1738
 
1739
  if not related_chunks:
1740
  logger.warning(f"No chunks found for document {doc_id}")
1741
+ # Mark that deletion operations have started
1742
+ deletion_operations_started = True
1743
+ try:
1744
+ # Still need to delete the doc status and full doc
1745
+ await self.full_docs.delete([doc_id])
1746
+ await self.doc_status.delete([doc_id])
1747
+ logger.info(f"Deleted document {doc_id} with no associated chunks")
1748
+ except Exception as e:
1749
+ logger.error(f"Failed to delete document {doc_id} with no chunks: {e}")
1750
+ raise Exception(f"Failed to delete document entry: {e}") from e
1751
+
1752
  return DeletionResult(
1753
  status="success",
1754
  doc_id=doc_id,
 
1759
  chunk_ids = set(related_chunks.keys())
1760
  logger.info(f"Found {len(chunk_ids)} chunks to delete")
1761
 
1762
+ # Mark that deletion operations have started
1763
+ deletion_operations_started = True
 
 
 
 
 
 
 
 
 
1764
 
1765
  # 4. Analyze entities and relationships that will be affected
1766
  entities_to_delete = set()
 
1771
  # Use graph database lock to ensure atomic merges and updates
1772
  graph_db_lock = get_graph_db_lock(enable_logging=False)
1773
  async with graph_db_lock:
1774
+ try:
1775
+ # Get all affected nodes and edges in batch
1776
+ logger.info(f"Analyzing affected entities and relationships for {len(chunk_ids)} chunks")
1777
+ affected_nodes = (
1778
+ await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids(
1779
+ list(chunk_ids)
1780
+ )
1781
  )
1782
+ affected_edges = (
1783
+ await self.chunk_entity_relation_graph.get_edges_by_chunk_ids(
1784
+ list(chunk_ids)
1785
+ )
1786
  )
1787
+ logger.info(f"Found {len(affected_nodes)} affected nodes and {len(affected_edges)} affected edges")
1788
+ except Exception as e:
1789
+ logger.error(f"Failed to analyze affected graph elements: {e}")
1790
+ raise Exception(f"Failed to analyze graph dependencies: {e}") from e
1791
+
1792
+ try:
1793
+ # Process entities
1794
+ for node_data in affected_nodes:
1795
+ node_label = node_data.get("entity_id")
1796
+ if node_label and "source_id" in node_data:
1797
+ sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
1798
+ remaining_sources = sources - chunk_ids
1799
+
1800
+ if not remaining_sources:
1801
+ entities_to_delete.add(node_label)
1802
+ elif remaining_sources != sources:
1803
+ entities_to_rebuild[node_label] = remaining_sources
1804
+
1805
+ # Process relationships
1806
+ for edge_data in affected_edges:
1807
+ src = edge_data.get("source")
1808
+ tgt = edge_data.get("target")
1809
+
1810
+ if src and tgt and "source_id" in edge_data:
1811
+ edge_tuple = tuple(sorted((src, tgt)))
1812
+ if (
1813
+ edge_tuple in relationships_to_delete
1814
+ or edge_tuple in relationships_to_rebuild
1815
+ ):
1816
+ continue
1817
+
1818
+ sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
1819
+ remaining_sources = sources - chunk_ids
1820
+
1821
+ if not remaining_sources:
1822
+ relationships_to_delete.add(edge_tuple)
1823
+ elif remaining_sources != sources:
1824
+ relationships_to_rebuild[edge_tuple] = remaining_sources
1825
+
1826
+ logger.info(f"Analysis complete: {len(entities_to_delete)} entities to delete, "
1827
+ f"{len(entities_to_rebuild)} entities to rebuild, "
1828
+ f"{len(relationships_to_delete)} relationships to delete, "
1829
+ f"{len(relationships_to_rebuild)} relationships to rebuild")
1830
+ except Exception as e:
1831
+ logger.error(f"Failed to process graph analysis results: {e}")
1832
+ raise Exception(f"Failed to process graph dependencies: {e}") from e
1833
 
1834
  # 5. Delete chunks from storage
1835
  if chunk_ids:
1836
+ try:
1837
+ logger.info(f"Deleting {len(chunk_ids)} chunks from storage")
1838
+ await self.chunks_vdb.delete(chunk_ids)
1839
+ await self.text_chunks.delete(chunk_ids)
1840
+ logger.info(f"Successfully deleted {len(chunk_ids)} chunks from storage")
1841
+ except Exception as e:
1842
+ logger.error(f"Failed to delete chunks: {e}")
1843
+ raise Exception(f"Failed to delete document chunks: {e}") from e
1844
 
1845
  # 6. Delete entities that have no remaining sources
1846
  if entities_to_delete:
1847
+ try:
1848
+ logger.info(f"Deleting {len(entities_to_delete)} entities")
1849
+ # Delete from vector database
1850
+ entity_vdb_ids = [
1851
+ compute_mdhash_id(entity, prefix="ent-")
1852
+ for entity in entities_to_delete
1853
+ ]
1854
+ await self.entities_vdb.delete(entity_vdb_ids)
1855
+
1856
+ # Delete from graph
1857
+ await self.chunk_entity_relation_graph.remove_nodes(
1858
+ list(entities_to_delete)
1859
+ )
1860
+ logger.info(f"Successfully deleted {len(entities_to_delete)} entities")
1861
+ except Exception as e:
1862
+ logger.error(f"Failed to delete entities: {e}")
1863
+ raise Exception(f"Failed to delete entities: {e}") from e
1864
 
1865
  # 7. Delete relationships that have no remaining sources
1866
  if relationships_to_delete:
1867
+ try:
1868
+ logger.info(f"Deleting {len(relationships_to_delete)} relationships")
1869
+ # Delete from vector database
1870
+ rel_ids_to_delete = []
1871
+ for src, tgt in relationships_to_delete:
1872
+ rel_ids_to_delete.extend(
1873
+ [
1874
+ compute_mdhash_id(src + tgt, prefix="rel-"),
1875
+ compute_mdhash_id(tgt + src, prefix="rel-"),
1876
+ ]
1877
+ )
1878
+ await self.relationships_vdb.delete(rel_ids_to_delete)
1879
 
1880
+ # Delete from graph
1881
+ await self.chunk_entity_relation_graph.remove_edges(
1882
+ list(relationships_to_delete)
1883
+ )
1884
+ logger.info(f"Successfully deleted {len(relationships_to_delete)} relationships")
1885
+ except Exception as e:
1886
+ logger.error(f"Failed to delete relationships: {e}")
1887
+ raise Exception(f"Failed to delete relationships: {e}") from e
1888
 
1889
+ # 8. Rebuild entities and relationships from remaining chunks
1890
  if entities_to_rebuild or relationships_to_rebuild:
1891
+ try:
1892
+ logger.info(f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships")
1893
+ await _rebuild_knowledge_from_chunks(
1894
+ entities_to_rebuild=entities_to_rebuild,
1895
+ relationships_to_rebuild=relationships_to_rebuild,
1896
+ knowledge_graph_inst=self.chunk_entity_relation_graph,
1897
+ entities_vdb=self.entities_vdb,
1898
+ relationships_vdb=self.relationships_vdb,
1899
+ text_chunks=self.text_chunks,
1900
+ llm_response_cache=self.llm_response_cache,
1901
+ global_config=asdict(self),
1902
+ )
1903
+ logger.info(f"Successfully rebuilt {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships")
1904
+ except Exception as e:
1905
+ logger.error(f"Failed to rebuild knowledge from chunks: {e}")
1906
+ raise Exception(f"Failed to rebuild knowledge graph: {e}") from e
1907
 
1908
  # 9. Delete original document and status
1909
+ try:
1910
+ logger.info(f"Deleting original document {doc_id} and its status")
1911
+ await self.full_docs.delete([doc_id])
1912
+ await self.doc_status.delete([doc_id])
1913
+ logger.info(f"Successfully deleted document {doc_id} and its status")
1914
+ except Exception as e:
1915
+ logger.error(f"Failed to delete document and status: {e}")
1916
+ raise Exception(f"Failed to delete document and status: {e}") from e
 
1917
 
1918
+ success_message = f"Successfully deleted document {doc_id}"
1919
  logger.info(success_message)
1920
  return DeletionResult(
1921
  status="success",
 
1925
  )
1926
 
1927
  except Exception as e:
1928
+ original_exception = e
1929
  error_message = f"Error while deleting document {doc_id}: {e}"
1930
  logger.error(error_message)
1931
  logger.error(traceback.format_exc())
 
1935
  message=error_message,
1936
  status_code=500,
1937
  )
1938
+
1939
+ finally:
1940
+ # ALWAYS ensure persistence if any deletion operations were started
1941
+ if deletion_operations_started:
1942
+ try:
1943
+ logger.info(f"Ensuring data persistence for document {doc_id} deletion")
1944
+ await self._insert_done()
1945
+ logger.info(f"Data persistence completed successfully for document {doc_id} deletion")
1946
+ except Exception as persistence_error:
1947
+ persistence_error_msg = f"Failed to persist data after deletion attempt for {doc_id}: {persistence_error}"
1948
+ logger.error(persistence_error_msg)
1949
+ logger.error(traceback.format_exc())
1950
+
1951
+ # If there was no original exception, this persistence error becomes the main error
1952
+ if original_exception is None:
1953
+ return DeletionResult(
1954
+ status="fail",
1955
+ doc_id=doc_id,
1956
+ message=f"Deletion completed but failed to persist changes: {persistence_error}",
1957
+ status_code=500,
1958
+ )
1959
+ # If there was an original exception, log the persistence error but don't override the original error
1960
+ # The original error result was already returned in the except block
1961
+ else:
1962
+ logger.debug(f"No deletion operations were started for document {doc_id}, skipping persistence")
1963
 
1964
  async def adelete_by_entity(self, entity_name: str) -> DeletionResult:
1965
  """Asynchronously delete an entity and all its relationships.