gzdaniel commited on
Commit
d07e391
·
1 Parent(s): e3235bf

Change the API for deleting documents to support deleting multiple documents at once.

Browse files
lightrag/api/routers/document_routes.py CHANGED
@@ -260,14 +260,25 @@ Attributes:
260
 
261
 
262
  class DeleteDocRequest(BaseModel):
263
- doc_id: str = Field(..., description="The ID of the document to delete.")
264
 
265
- @field_validator("doc_id", mode="after")
266
  @classmethod
267
- def validate_doc_id(cls, doc_id: str) -> str:
268
- if not doc_id or not doc_id.strip():
269
- raise ValueError("Document ID cannot be empty")
270
- return doc_id.strip()
 
 
 
 
 
 
 
 
 
 
 
271
 
272
 
273
  class DeleteEntityRequest(BaseModel):
@@ -782,8 +793,8 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
782
  logger.error(traceback.format_exc())
783
 
784
 
785
- async def background_delete_document(rag: LightRAG, doc_id: str):
786
- """Background task to delete a document"""
787
  from lightrag.kg.shared_storage import (
788
  get_namespace_data,
789
  get_pipeline_status_lock,
@@ -792,13 +803,20 @@ async def background_delete_document(rag: LightRAG, doc_id: str):
792
  pipeline_status = await get_namespace_data("pipeline_status")
793
  pipeline_status_lock = get_pipeline_status_lock()
794
 
 
 
 
 
795
  # Set pipeline status to busy for deletion
796
  async with pipeline_status_lock:
797
  pipeline_status.update(
798
  {
799
  "busy": True,
800
- "job_name": f"Deleting Document: {doc_id}",
801
  "job_start": datetime.now().isoformat(),
 
 
 
802
  "latest_message": "Starting document deletion process",
803
  }
804
  )
@@ -806,40 +824,81 @@ async def background_delete_document(rag: LightRAG, doc_id: str):
806
  pipeline_status["history_messages"][:] = ["Starting document deletion process"]
807
 
808
  try:
809
- result = await rag.adelete_by_doc_id(doc_id)
810
- if "history_messages" in pipeline_status:
811
- pipeline_status["history_messages"].append(result.message)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
812
 
813
- logger.info(f"Document deletion completed for {doc_id}: {result.status}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
814
 
815
  except Exception as e:
816
- error_msg = f"Error deleting document {doc_id}: {str(e)}"
817
  logger.error(error_msg)
818
  logger.error(traceback.format_exc())
819
- if "history_messages" in pipeline_status:
 
820
  pipeline_status["history_messages"].append(error_msg)
821
  finally:
822
  async with pipeline_status_lock:
823
  pipeline_status["busy"] = False
824
  completion_msg = "Document deletion process completed."
825
  pipeline_status["latest_message"] = completion_msg
826
- if "history_messages" in pipeline_status:
827
- pipeline_status["history_messages"].append(completion_msg)
828
 
829
 
830
  def create_document_routes(
831
  rag: LightRAG, doc_manager: DocumentManager, api_key: Optional[str] = None
832
  ):
833
- # Check if doc_id exists in the system - add this check at the beginning
834
- async def check_doc_id_exists(doc_id: str) -> bool:
835
- """Check if document ID exists in the system"""
836
- try:
837
- doc_status = await rag.doc_status.get_by_id(doc_id)
838
- return doc_status is not None
839
- except Exception as e:
840
- logger.error(f"Error checking doc_id existence: {str(e)}")
841
- return False
842
-
843
  # Create combined auth dependency for document routes
844
  combined_auth = get_combined_auth_dependency(api_key)
845
 
@@ -1326,9 +1385,9 @@ def create_document_routes(
1326
  background_tasks: BackgroundTasks,
1327
  ) -> DeleteDocByIdResponse:
1328
  """
1329
- Delete a document and all its associated data by its ID using background processing.
1330
 
1331
- Deletes a specific document and all its associated data, including its status,
1332
  text chunks, vector embeddings, and any related graph data.
1333
  The deletion process runs in the background to avoid blocking the client connection.
1334
  It is disabled when llm cache for entity extraction is disabled.
@@ -1336,7 +1395,7 @@ def create_document_routes(
1336
  This operation is irreversible and will interact with the pipeline status.
1337
 
1338
  Args:
1339
- delete_request (DeleteDocRequest): The request containing the document ID.
1340
  background_tasks: FastAPI BackgroundTasks for async processing
1341
 
1342
  Returns:
@@ -1349,10 +1408,7 @@ def create_document_routes(
1349
  HTTPException:
1350
  - 500: If an unexpected internal error occurs during initialization.
1351
  """
1352
- # Check if doc_id exists first - return error immediately if not found
1353
- doc_id = delete_request.doc_id
1354
- if not await check_doc_id_exists(doc_id):
1355
- raise HTTPException(status_code=404, detail=f"Document {doc_id} not found.")
1356
 
1357
  # The rag object is initialized from the server startup args,
1358
  # so we can access its properties here.
@@ -1360,7 +1416,7 @@ def create_document_routes(
1360
  return DeleteDocByIdResponse(
1361
  status="not_allowed",
1362
  message="Operation not allowed when LLM cache for entity extraction is disabled.",
1363
- doc_id=delete_request.doc_id,
1364
  )
1365
 
1366
  try:
@@ -1372,21 +1428,21 @@ def create_document_routes(
1372
  if pipeline_status.get("busy", False):
1373
  return DeleteDocByIdResponse(
1374
  status="busy",
1375
- message="Cannot delete document while pipeline is busy",
1376
- doc_id=doc_id,
1377
  )
1378
 
1379
  # Add deletion task to background tasks
1380
- background_tasks.add_task(background_delete_document, rag, doc_id)
1381
 
1382
  return DeleteDocByIdResponse(
1383
  status="deletion_started",
1384
- message=f"Document deletion for '{doc_id}' has been initiated. Processing will continue in background.",
1385
- doc_id=doc_id,
1386
  )
1387
 
1388
  except Exception as e:
1389
- error_msg = f"Error initiating document deletion for {delete_request.doc_id}: {str(e)}"
1390
  logger.error(error_msg)
1391
  logger.error(traceback.format_exc())
1392
  raise HTTPException(status_code=500, detail=error_msg)
 
260
 
261
 
262
  class DeleteDocRequest(BaseModel):
263
+ doc_ids: List[str] = Field(..., description="The IDs of the documents to delete.")
264
 
265
+ @field_validator("doc_ids", mode="after")
266
  @classmethod
267
+ def validate_doc_ids(cls, doc_ids: List[str]) -> List[str]:
268
+ if not doc_ids:
269
+ raise ValueError("Document IDs list cannot be empty")
270
+
271
+ validated_ids = []
272
+ for doc_id in doc_ids:
273
+ if not doc_id or not doc_id.strip():
274
+ raise ValueError("Document ID cannot be empty")
275
+ validated_ids.append(doc_id.strip())
276
+
277
+ # Check for duplicates
278
+ if len(validated_ids) != len(set(validated_ids)):
279
+ raise ValueError("Document IDs must be unique")
280
+
281
+ return validated_ids
282
 
283
 
284
  class DeleteEntityRequest(BaseModel):
 
793
  logger.error(traceback.format_exc())
794
 
795
 
796
+ async def background_delete_documents(rag: LightRAG, doc_ids: List[str]):
797
+ """Background task to delete multiple documents"""
798
  from lightrag.kg.shared_storage import (
799
  get_namespace_data,
800
  get_pipeline_status_lock,
 
803
  pipeline_status = await get_namespace_data("pipeline_status")
804
  pipeline_status_lock = get_pipeline_status_lock()
805
 
806
+ total_docs = len(doc_ids)
807
+ successful_deletions = []
808
+ failed_deletions = []
809
+
810
  # Set pipeline status to busy for deletion
811
  async with pipeline_status_lock:
812
  pipeline_status.update(
813
  {
814
  "busy": True,
815
+ "job_name": f"Deleting {total_docs} Documents",
816
  "job_start": datetime.now().isoformat(),
817
+ "docs": total_docs,
818
+ "batchs": total_docs,
819
+ "cur_batch": 0,
820
  "latest_message": "Starting document deletion process",
821
  }
822
  )
 
824
  pipeline_status["history_messages"][:] = ["Starting document deletion process"]
825
 
826
  try:
827
+ # Loop through each document ID and delete them one by one
828
+ for i, doc_id in enumerate(doc_ids, 1):
829
+ async with pipeline_status_lock:
830
+ pipeline_status["cur_batch"] = i
831
+ pipeline_status["latest_message"] = (
832
+ f"Deleting document {i}/{total_docs}: {doc_id}"
833
+ )
834
+ pipeline_status["history_messages"].append(
835
+ f"Processing document {i}/{total_docs}: {doc_id}"
836
+ )
837
+
838
+ try:
839
+ result = await rag.adelete_by_doc_id(doc_id)
840
+
841
+ if result.status == "success":
842
+ successful_deletions.append(doc_id)
843
+ success_msg = (
844
+ f"Successfully deleted document {i}/{total_docs}: {doc_id}"
845
+ )
846
+ logger.info(success_msg)
847
+
848
+ async with pipeline_status_lock:
849
+ pipeline_status["history_messages"].append(success_msg)
850
+ else:
851
+ failed_deletions.append(doc_id)
852
+ error_msg = f"Failed to delete document {i}/{total_docs}: {doc_id} - {result.message}"
853
+ logger.error(error_msg)
854
+
855
+ async with pipeline_status_lock:
856
+ pipeline_status["history_messages"].append(error_msg)
857
 
858
+ except Exception as e:
859
+ failed_deletions.append(doc_id)
860
+ error_msg = (
861
+ f"Error deleting document {i}/{total_docs}: {doc_id} - {str(e)}"
862
+ )
863
+ logger.error(error_msg)
864
+ logger.error(traceback.format_exc())
865
+
866
+ async with pipeline_status_lock:
867
+ pipeline_status["history_messages"].append(error_msg)
868
+
869
+ # Final summary
870
+ summary_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
871
+ logger.info(summary_msg)
872
+
873
+ async with pipeline_status_lock:
874
+ pipeline_status["history_messages"].append(summary_msg)
875
+ if successful_deletions:
876
+ pipeline_status["history_messages"].append(
877
+ f"Successfully deleted: {', '.join(successful_deletions)}"
878
+ )
879
+ if failed_deletions:
880
+ pipeline_status["history_messages"].append(
881
+ f"Failed to delete: {', '.join(failed_deletions)}"
882
+ )
883
 
884
  except Exception as e:
885
+ error_msg = f"Critical error during batch deletion: {str(e)}"
886
  logger.error(error_msg)
887
  logger.error(traceback.format_exc())
888
+
889
+ async with pipeline_status_lock:
890
  pipeline_status["history_messages"].append(error_msg)
891
  finally:
892
  async with pipeline_status_lock:
893
  pipeline_status["busy"] = False
894
  completion_msg = "Document deletion process completed."
895
  pipeline_status["latest_message"] = completion_msg
896
+ pipeline_status["history_messages"].append(completion_msg)
 
897
 
898
 
899
  def create_document_routes(
900
  rag: LightRAG, doc_manager: DocumentManager, api_key: Optional[str] = None
901
  ):
 
 
 
 
 
 
 
 
 
 
902
  # Create combined auth dependency for document routes
903
  combined_auth = get_combined_auth_dependency(api_key)
904
 
 
1385
  background_tasks: BackgroundTasks,
1386
  ) -> DeleteDocByIdResponse:
1387
  """
1388
+ Delete documents and all their associated data by their IDs using background processing.
1389
 
1390
+ Deletes specific documents and all their associated data, including their status,
1391
  text chunks, vector embeddings, and any related graph data.
1392
  The deletion process runs in the background to avoid blocking the client connection.
1393
  It is disabled when llm cache for entity extraction is disabled.
 
1395
  This operation is irreversible and will interact with the pipeline status.
1396
 
1397
  Args:
1398
+ delete_request (DeleteDocRequest): The request containing the document IDs.
1399
  background_tasks: FastAPI BackgroundTasks for async processing
1400
 
1401
  Returns:
 
1408
  HTTPException:
1409
  - 500: If an unexpected internal error occurs during initialization.
1410
  """
1411
+ doc_ids = delete_request.doc_ids
 
 
 
1412
 
1413
  # The rag object is initialized from the server startup args,
1414
  # so we can access its properties here.
 
1416
  return DeleteDocByIdResponse(
1417
  status="not_allowed",
1418
  message="Operation not allowed when LLM cache for entity extraction is disabled.",
1419
+ doc_id=", ".join(delete_request.doc_ids),
1420
  )
1421
 
1422
  try:
 
1428
  if pipeline_status.get("busy", False):
1429
  return DeleteDocByIdResponse(
1430
  status="busy",
1431
+ message="Cannot delete documents while pipeline is busy",
1432
+ doc_id=", ".join(doc_ids),
1433
  )
1434
 
1435
  # Add deletion task to background tasks
1436
+ background_tasks.add_task(background_delete_documents, rag, doc_ids)
1437
 
1438
  return DeleteDocByIdResponse(
1439
  status="deletion_started",
1440
+ message=f"Document deletion for {len(doc_ids)} documents has been initiated. Processing will continue in background.",
1441
+ doc_id=", ".join(doc_ids),
1442
  )
1443
 
1444
  except Exception as e:
1445
+ error_msg = f"Error initiating document deletion for {delete_request.doc_ids}: {str(e)}"
1446
  logger.error(error_msg)
1447
  logger.error(traceback.format_exc())
1448
  raise HTTPException(status_code=500, detail=error_msg)
lightrag/lightrag.py CHANGED
@@ -1961,12 +1961,6 @@ class LightRAG:
1961
  logger.error(f"Failed to delete document and status: {e}")
1962
  raise Exception(f"Failed to delete document and status: {e}") from e
1963
 
1964
- async with pipeline_status_lock:
1965
- log_message = f"Successfully deleted document {doc_id}"
1966
- logger.info(log_message)
1967
- pipeline_status["latest_message"] = log_message
1968
- pipeline_status["history_messages"].append(log_message)
1969
-
1970
  return DeletionResult(
1971
  status="success",
1972
  doc_id=doc_id,
 
1961
  logger.error(f"Failed to delete document and status: {e}")
1962
  raise Exception(f"Failed to delete document and status: {e}") from e
1963
 
 
 
 
 
 
 
1964
  return DeletionResult(
1965
  status="success",
1966
  doc_id=doc_id,