gzdaniel commited on
Commit
e3235bf
·
1 Parent(s): f3664aa

Improve the pipeline status message for document deletetion

Browse files
lightrag/api/routers/document_routes.py CHANGED
@@ -803,9 +803,7 @@ async def background_delete_document(rag: LightRAG, doc_id: str):
803
  }
804
  )
805
  # Use slice assignment to clear the list in place
806
- pipeline_status["history_messages"][:] = [
807
- f"Starting deletion for doc_id: {doc_id}"
808
- ]
809
 
810
  try:
811
  result = await rag.adelete_by_doc_id(doc_id)
@@ -823,7 +821,7 @@ async def background_delete_document(rag: LightRAG, doc_id: str):
823
  finally:
824
  async with pipeline_status_lock:
825
  pipeline_status["busy"] = False
826
- completion_msg = f"Document deletion process for {doc_id} completed."
827
  pipeline_status["latest_message"] = completion_msg
828
  if "history_messages" in pipeline_status:
829
  pipeline_status["history_messages"].append(completion_msg)
 
803
  }
804
  )
805
  # Use slice assignment to clear the list in place
806
+ pipeline_status["history_messages"][:] = ["Starting document deletion process"]
 
 
807
 
808
  try:
809
  result = await rag.adelete_by_doc_id(doc_id)
 
821
  finally:
822
  async with pipeline_status_lock:
823
  pipeline_status["busy"] = False
824
+ completion_msg = "Document deletion process completed."
825
  pipeline_status["latest_message"] = completion_msg
826
  if "history_messages" in pipeline_status:
827
  pipeline_status["history_messages"].append(completion_msg)
lightrag/lightrag.py CHANGED
@@ -1683,15 +1683,7 @@ class LightRAG:
1683
 
1684
  This method orchestrates a comprehensive deletion process for a given document ID.
1685
  It ensures that not only the document itself but also all its derived and associated
1686
- data across different storage layers are removed. This includes:
1687
- 1. **Document and Status**: Deletes the document from `full_docs` and its status from `doc_status`.
1688
- 2. **Chunks**: Removes all associated text chunks from `chunks_vdb`.
1689
- 3. **Graph Data**:
1690
- - Deletes related entities from `entities_vdb`.
1691
- - Deletes related relationships from `relationships_vdb`.
1692
- - Removes corresponding nodes and edges from the `chunk_entity_relation_graph`.
1693
- 4. **Graph Reconstruction**: If entities or relationships are partially affected, it triggers
1694
- a reconstruction of their data from the remaining chunks to ensure consistency.
1695
 
1696
  Args:
1697
  doc_id (str): The unique identifier of the document to be deleted.
@@ -1706,9 +1698,17 @@ class LightRAG:
1706
  deletion_operations_started = False
1707
  original_exception = None
1708
 
1709
- try:
1710
- logger.info(f"Starting deletion process for document {doc_id}")
 
1711
 
 
 
 
 
 
 
 
1712
  # 1. Get the document status and related data
1713
  doc_status_data = await self.doc_status.get_by_id(doc_id)
1714
  if not doc_status_data:
@@ -1720,8 +1720,6 @@ class LightRAG:
1720
  status_code=404,
1721
  )
1722
 
1723
- logger.info(f"Starting optimized deletion for document {doc_id}")
1724
-
1725
  # 2. Get all chunks related to this document
1726
  try:
1727
  all_chunks = await self.text_chunks.get_all()
@@ -1731,9 +1729,14 @@ class LightRAG:
1731
  if isinstance(chunk_data, dict)
1732
  and chunk_data.get("full_doc_id") == doc_id
1733
  }
1734
- logger.info(
1735
- f"Retrieved {len(all_chunks)} total chunks, {len(related_chunks)} related to document {doc_id}"
1736
- )
 
 
 
 
 
1737
  except Exception as e:
1738
  logger.error(f"Failed to retrieve chunks for document {doc_id}: {e}")
1739
  raise Exception(f"Failed to retrieve document chunks: {e}") from e
@@ -1753,16 +1756,22 @@ class LightRAG:
1753
  )
1754
  raise Exception(f"Failed to delete document entry: {e}") from e
1755
 
 
 
 
 
 
 
 
 
1756
  return DeletionResult(
1757
  status="success",
1758
  doc_id=doc_id,
1759
- message=f"Document {doc_id} found but had no associated chunks. Document entry deleted.",
1760
  status_code=200,
1761
  )
1762
 
1763
  chunk_ids = set(related_chunks.keys())
1764
- logger.info(f"Found {len(chunk_ids)} chunks to delete")
1765
-
1766
  # Mark that deletion operations have started
1767
  deletion_operations_started = True
1768
 
@@ -1777,22 +1786,35 @@ class LightRAG:
1777
  async with graph_db_lock:
1778
  try:
1779
  # Get all affected nodes and edges in batch
1780
- logger.info(
1781
- f"Analyzing affected entities and relationships for {len(chunk_ids)} chunks"
1782
- )
1783
  affected_nodes = (
1784
  await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids(
1785
  list(chunk_ids)
1786
  )
1787
  )
 
 
 
 
 
 
 
 
1788
  affected_edges = (
1789
  await self.chunk_entity_relation_graph.get_edges_by_chunk_ids(
1790
  list(chunk_ids)
1791
  )
1792
  )
1793
- logger.info(
1794
- f"Found {len(affected_nodes)} affected nodes and {len(affected_edges)} affected edges"
1795
- )
 
 
 
 
 
1796
  except Exception as e:
1797
  logger.error(f"Failed to analyze affected graph elements: {e}")
1798
  raise Exception(f"Failed to analyze graph dependencies: {e}") from e
@@ -1831,12 +1853,6 @@ class LightRAG:
1831
  elif remaining_sources != sources:
1832
  relationships_to_rebuild[edge_tuple] = remaining_sources
1833
 
1834
- logger.info(
1835
- f"Analysis complete: {len(entities_to_delete)} entities to delete, "
1836
- f"{len(entities_to_rebuild)} entities to rebuild, "
1837
- f"{len(relationships_to_delete)} relationships to delete, "
1838
- f"{len(relationships_to_rebuild)} relationships to rebuild"
1839
- )
1840
  except Exception as e:
1841
  logger.error(f"Failed to process graph analysis results: {e}")
1842
  raise Exception(f"Failed to process graph dependencies: {e}") from e
@@ -1844,12 +1860,15 @@ class LightRAG:
1844
  # 5. Delete chunks from storage
1845
  if chunk_ids:
1846
  try:
1847
- logger.info(f"Deleting {len(chunk_ids)} chunks from storage")
1848
  await self.chunks_vdb.delete(chunk_ids)
1849
  await self.text_chunks.delete(chunk_ids)
1850
- logger.info(
1851
- f"Successfully deleted {len(chunk_ids)} chunks from storage"
1852
- )
 
 
 
 
1853
  except Exception as e:
1854
  logger.error(f"Failed to delete chunks: {e}")
1855
  raise Exception(f"Failed to delete document chunks: {e}") from e
@@ -1857,7 +1876,6 @@ class LightRAG:
1857
  # 6. Delete entities that have no remaining sources
1858
  if entities_to_delete:
1859
  try:
1860
- logger.info(f"Deleting {len(entities_to_delete)} entities")
1861
  # Delete from vector database
1862
  entity_vdb_ids = [
1863
  compute_mdhash_id(entity, prefix="ent-")
@@ -1869,9 +1887,13 @@ class LightRAG:
1869
  await self.chunk_entity_relation_graph.remove_nodes(
1870
  list(entities_to_delete)
1871
  )
1872
- logger.info(
1873
- f"Successfully deleted {len(entities_to_delete)} entities"
1874
- )
 
 
 
 
1875
  except Exception as e:
1876
  logger.error(f"Failed to delete entities: {e}")
1877
  raise Exception(f"Failed to delete entities: {e}") from e
@@ -1879,9 +1901,6 @@ class LightRAG:
1879
  # 7. Delete relationships that have no remaining sources
1880
  if relationships_to_delete:
1881
  try:
1882
- logger.info(
1883
- f"Deleting {len(relationships_to_delete)} relationships"
1884
- )
1885
  # Delete from vector database
1886
  rel_ids_to_delete = []
1887
  for src, tgt in relationships_to_delete:
@@ -1897,9 +1916,13 @@ class LightRAG:
1897
  await self.chunk_entity_relation_graph.remove_edges(
1898
  list(relationships_to_delete)
1899
  )
1900
- logger.info(
1901
- f"Successfully deleted {len(relationships_to_delete)} relationships"
1902
- )
 
 
 
 
1903
  except Exception as e:
1904
  logger.error(f"Failed to delete relationships: {e}")
1905
  raise Exception(f"Failed to delete relationships: {e}") from e
@@ -1907,9 +1930,6 @@ class LightRAG:
1907
  # 8. Rebuild entities and relationships from remaining chunks
1908
  if entities_to_rebuild or relationships_to_rebuild:
1909
  try:
1910
- logger.info(
1911
- f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships"
1912
- )
1913
  await _rebuild_knowledge_from_chunks(
1914
  entities_to_rebuild=entities_to_rebuild,
1915
  relationships_to_rebuild=relationships_to_rebuild,
@@ -1920,9 +1940,13 @@ class LightRAG:
1920
  llm_response_cache=self.llm_response_cache,
1921
  global_config=asdict(self),
1922
  )
1923
- logger.info(
1924
- f"Successfully rebuilt {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships"
1925
- )
 
 
 
 
1926
  except Exception as e:
1927
  logger.error(f"Failed to rebuild knowledge from chunks: {e}")
1928
  raise Exception(
@@ -1931,20 +1955,22 @@ class LightRAG:
1931
 
1932
  # 9. Delete original document and status
1933
  try:
1934
- logger.info(f"Deleting original document {doc_id} and its status")
1935
  await self.full_docs.delete([doc_id])
1936
  await self.doc_status.delete([doc_id])
1937
- logger.info(f"Successfully deleted document {doc_id} and its status")
1938
  except Exception as e:
1939
  logger.error(f"Failed to delete document and status: {e}")
1940
  raise Exception(f"Failed to delete document and status: {e}") from e
1941
 
1942
- success_message = f"Successfully deleted document {doc_id}"
1943
- logger.info(success_message)
 
 
 
 
1944
  return DeletionResult(
1945
  status="success",
1946
  doc_id=doc_id,
1947
- message=success_message,
1948
  status_code=200,
1949
  )
1950
 
@@ -1964,13 +1990,7 @@ class LightRAG:
1964
  # ALWAYS ensure persistence if any deletion operations were started
1965
  if deletion_operations_started:
1966
  try:
1967
- logger.info(
1968
- f"Ensuring data persistence for document {doc_id} deletion"
1969
- )
1970
  await self._insert_done()
1971
- logger.info(
1972
- f"Data persistence completed successfully for document {doc_id} deletion"
1973
- )
1974
  except Exception as persistence_error:
1975
  persistence_error_msg = f"Failed to persist data after deletion attempt for {doc_id}: {persistence_error}"
1976
  logger.error(persistence_error_msg)
 
1683
 
1684
  This method orchestrates a comprehensive deletion process for a given document ID.
1685
  It ensures that not only the document itself but also all its derived and associated
1686
+ data across different storage layers are removed. If entities or relationships are partially affected, it triggers.
 
 
 
 
 
 
 
 
1687
 
1688
  Args:
1689
  doc_id (str): The unique identifier of the document to be deleted.
 
1698
  deletion_operations_started = False
1699
  original_exception = None
1700
 
1701
+ # Get pipeline status shared data and lock for status updates
1702
+ pipeline_status = await get_namespace_data("pipeline_status")
1703
+ pipeline_status_lock = get_pipeline_status_lock()
1704
 
1705
+ async with pipeline_status_lock:
1706
+ log_message = f"Starting deletion process for document {doc_id}"
1707
+ logger.info(log_message)
1708
+ pipeline_status["latest_message"] = log_message
1709
+ pipeline_status["history_messages"].append(log_message)
1710
+
1711
+ try:
1712
  # 1. Get the document status and related data
1713
  doc_status_data = await self.doc_status.get_by_id(doc_id)
1714
  if not doc_status_data:
 
1720
  status_code=404,
1721
  )
1722
 
 
 
1723
  # 2. Get all chunks related to this document
1724
  try:
1725
  all_chunks = await self.text_chunks.get_all()
 
1729
  if isinstance(chunk_data, dict)
1730
  and chunk_data.get("full_doc_id") == doc_id
1731
  }
1732
+
1733
+ # Update pipeline status after getting chunks count
1734
+ async with pipeline_status_lock:
1735
+ log_message = f"Retrieved {len(related_chunks)} of {len(all_chunks)} related chunks"
1736
+ logger.info(log_message)
1737
+ pipeline_status["latest_message"] = log_message
1738
+ pipeline_status["history_messages"].append(log_message)
1739
+
1740
  except Exception as e:
1741
  logger.error(f"Failed to retrieve chunks for document {doc_id}: {e}")
1742
  raise Exception(f"Failed to retrieve document chunks: {e}") from e
 
1756
  )
1757
  raise Exception(f"Failed to delete document entry: {e}") from e
1758
 
1759
+ async with pipeline_status_lock:
1760
+ log_message = (
1761
+ f"Document {doc_id} is deleted without associated chunks."
1762
+ )
1763
+ logger.info(log_message)
1764
+ pipeline_status["latest_message"] = log_message
1765
+ pipeline_status["history_messages"].append(log_message)
1766
+
1767
  return DeletionResult(
1768
  status="success",
1769
  doc_id=doc_id,
1770
+ message=log_message,
1771
  status_code=200,
1772
  )
1773
 
1774
  chunk_ids = set(related_chunks.keys())
 
 
1775
  # Mark that deletion operations have started
1776
  deletion_operations_started = True
1777
 
 
1786
  async with graph_db_lock:
1787
  try:
1788
  # Get all affected nodes and edges in batch
1789
+ # logger.info(
1790
+ # f"Analyzing affected entities and relationships for {len(chunk_ids)} chunks"
1791
+ # )
1792
  affected_nodes = (
1793
  await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids(
1794
  list(chunk_ids)
1795
  )
1796
  )
1797
+
1798
+ # Update pipeline status after getting affected_nodes
1799
+ async with pipeline_status_lock:
1800
+ log_message = f"Found {len(affected_nodes)} affected entities"
1801
+ logger.info(log_message)
1802
+ pipeline_status["latest_message"] = log_message
1803
+ pipeline_status["history_messages"].append(log_message)
1804
+
1805
  affected_edges = (
1806
  await self.chunk_entity_relation_graph.get_edges_by_chunk_ids(
1807
  list(chunk_ids)
1808
  )
1809
  )
1810
+
1811
+ # Update pipeline status after getting affected_edges
1812
+ async with pipeline_status_lock:
1813
+ log_message = f"Found {len(affected_edges)} affected relations"
1814
+ logger.info(log_message)
1815
+ pipeline_status["latest_message"] = log_message
1816
+ pipeline_status["history_messages"].append(log_message)
1817
+
1818
  except Exception as e:
1819
  logger.error(f"Failed to analyze affected graph elements: {e}")
1820
  raise Exception(f"Failed to analyze graph dependencies: {e}") from e
 
1853
  elif remaining_sources != sources:
1854
  relationships_to_rebuild[edge_tuple] = remaining_sources
1855
 
 
 
 
 
 
 
1856
  except Exception as e:
1857
  logger.error(f"Failed to process graph analysis results: {e}")
1858
  raise Exception(f"Failed to process graph dependencies: {e}") from e
 
1860
  # 5. Delete chunks from storage
1861
  if chunk_ids:
1862
  try:
 
1863
  await self.chunks_vdb.delete(chunk_ids)
1864
  await self.text_chunks.delete(chunk_ids)
1865
+
1866
+ async with pipeline_status_lock:
1867
+ log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage"
1868
+ logger.info(log_message)
1869
+ pipeline_status["latest_message"] = log_message
1870
+ pipeline_status["history_messages"].append(log_message)
1871
+
1872
  except Exception as e:
1873
  logger.error(f"Failed to delete chunks: {e}")
1874
  raise Exception(f"Failed to delete document chunks: {e}") from e
 
1876
  # 6. Delete entities that have no remaining sources
1877
  if entities_to_delete:
1878
  try:
 
1879
  # Delete from vector database
1880
  entity_vdb_ids = [
1881
  compute_mdhash_id(entity, prefix="ent-")
 
1887
  await self.chunk_entity_relation_graph.remove_nodes(
1888
  list(entities_to_delete)
1889
  )
1890
+
1891
+ async with pipeline_status_lock:
1892
+ log_message = f"Successfully deleted {len(entities_to_delete)} entities"
1893
+ logger.info(log_message)
1894
+ pipeline_status["latest_message"] = log_message
1895
+ pipeline_status["history_messages"].append(log_message)
1896
+
1897
  except Exception as e:
1898
  logger.error(f"Failed to delete entities: {e}")
1899
  raise Exception(f"Failed to delete entities: {e}") from e
 
1901
  # 7. Delete relationships that have no remaining sources
1902
  if relationships_to_delete:
1903
  try:
 
 
 
1904
  # Delete from vector database
1905
  rel_ids_to_delete = []
1906
  for src, tgt in relationships_to_delete:
 
1916
  await self.chunk_entity_relation_graph.remove_edges(
1917
  list(relationships_to_delete)
1918
  )
1919
+
1920
+ async with pipeline_status_lock:
1921
+ log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
1922
+ logger.info(log_message)
1923
+ pipeline_status["latest_message"] = log_message
1924
+ pipeline_status["history_messages"].append(log_message)
1925
+
1926
  except Exception as e:
1927
  logger.error(f"Failed to delete relationships: {e}")
1928
  raise Exception(f"Failed to delete relationships: {e}") from e
 
1930
  # 8. Rebuild entities and relationships from remaining chunks
1931
  if entities_to_rebuild or relationships_to_rebuild:
1932
  try:
 
 
 
1933
  await _rebuild_knowledge_from_chunks(
1934
  entities_to_rebuild=entities_to_rebuild,
1935
  relationships_to_rebuild=relationships_to_rebuild,
 
1940
  llm_response_cache=self.llm_response_cache,
1941
  global_config=asdict(self),
1942
  )
1943
+
1944
+ async with pipeline_status_lock:
1945
+ log_message = f"Successfully rebuilt {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relations"
1946
+ logger.info(log_message)
1947
+ pipeline_status["latest_message"] = log_message
1948
+ pipeline_status["history_messages"].append(log_message)
1949
+
1950
  except Exception as e:
1951
  logger.error(f"Failed to rebuild knowledge from chunks: {e}")
1952
  raise Exception(
 
1955
 
1956
  # 9. Delete original document and status
1957
  try:
 
1958
  await self.full_docs.delete([doc_id])
1959
  await self.doc_status.delete([doc_id])
 
1960
  except Exception as e:
1961
  logger.error(f"Failed to delete document and status: {e}")
1962
  raise Exception(f"Failed to delete document and status: {e}") from e
1963
 
1964
+ async with pipeline_status_lock:
1965
+ log_message = f"Successfully deleted document {doc_id}"
1966
+ logger.info(log_message)
1967
+ pipeline_status["latest_message"] = log_message
1968
+ pipeline_status["history_messages"].append(log_message)
1969
+
1970
  return DeletionResult(
1971
  status="success",
1972
  doc_id=doc_id,
1973
+ message=log_message,
1974
  status_code=200,
1975
  )
1976
 
 
1990
  # ALWAYS ensure persistence if any deletion operations were started
1991
  if deletion_operations_started:
1992
  try:
 
 
 
1993
  await self._insert_done()
 
 
 
1994
  except Exception as persistence_error:
1995
  persistence_error_msg = f"Failed to persist data after deletion attempt for {doc_id}: {persistence_error}"
1996
  logger.error(persistence_error_msg)
lightrag/operate.py CHANGED
@@ -270,7 +270,7 @@ async def _rebuild_knowledge_from_chunks(
270
  for chunk_ids in relationships_to_rebuild.values():
271
  all_referenced_chunk_ids.update(chunk_ids)
272
 
273
- logger.info(
274
  f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions"
275
  )
276
 
@@ -339,7 +339,7 @@ async def _rebuild_knowledge_from_chunks(
339
  except Exception as e:
340
  logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}")
341
 
342
- logger.info("Completed rebuilding knowledge from cached extractions")
343
 
344
 
345
  async def _get_cached_extraction_results(
@@ -368,7 +368,7 @@ async def _get_cached_extraction_results(
368
  extraction_result = cache_entry["return"]
369
  cached_results[chunk_id] = extraction_result
370
 
371
- logger.info(
372
  f"Found {len(cached_results)} cached extraction results for {len(chunk_ids)} chunk IDs"
373
  )
374
  return cached_results
 
270
  for chunk_ids in relationships_to_rebuild.values():
271
  all_referenced_chunk_ids.update(chunk_ids)
272
 
273
+ logger.debug(
274
  f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions"
275
  )
276
 
 
339
  except Exception as e:
340
  logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}")
341
 
342
+ logger.debug("Completed rebuilding knowledge from cached extractions")
343
 
344
 
345
  async def _get_cached_extraction_results(
 
368
  extraction_result = cache_entry["return"]
369
  cached_results[chunk_id] = extraction_result
370
 
371
+ logger.debug(
372
  f"Found {len(cached_results)} cached extraction results for {len(chunk_ids)} chunk IDs"
373
  )
374
  return cached_results