gzdaniel commited on
Commit
8e46aa3
·
1 Parent(s): 73387c0

Change document deletion API to async

Browse files
lightrag/api/routers/document_routes.py CHANGED
@@ -782,9 +782,66 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
782
  logger.error(traceback.format_exc())
783
 
784
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
785
  def create_document_routes(
786
  rag: LightRAG, doc_manager: DocumentManager, api_key: Optional[str] = None
787
  ):
 
 
 
 
 
 
 
 
 
 
788
  # Create combined auth dependency for document routes
789
  combined_auth = get_combined_auth_dependency(api_key)
790
 
@@ -1254,13 +1311,11 @@ def create_document_routes(
1254
  class DeleteDocByIdResponse(BaseModel):
1255
  """Response model for single document deletion operation."""
1256
 
1257
- status: Literal["success", "fail", "not_found", "busy"] = Field(
1258
  description="Status of the deletion operation"
1259
  )
1260
  message: str = Field(description="Message describing the operation result")
1261
- doc_id: Optional[str] = Field(
1262
- default=None, description="The ID of the document."
1263
- )
1264
 
1265
  @router.delete(
1266
  "/delete_document",
@@ -1269,100 +1324,78 @@ def create_document_routes(
1269
  summary="Delete a document and all its associated data by its ID.",
1270
  )
1271
 
1272
- # TODO This method needs to be modified to be asynchronous (please do not use)
1273
  async def delete_document(
1274
  delete_request: DeleteDocRequest,
 
1275
  ) -> DeleteDocByIdResponse:
1276
  """
1277
- This method needs to be modified to be asynchronous (please do not use)
1278
 
1279
  Deletes a specific document and all its associated data, including its status,
1280
  text chunks, vector embeddings, and any related graph data.
 
1281
  It is disabled when llm cache for entity extraction is disabled.
1282
 
1283
  This operation is irreversible and will interact with the pipeline status.
1284
 
1285
  Args:
1286
  delete_request (DeleteDocRequest): The request containing the document ID.
 
1287
 
1288
  Returns:
1289
  DeleteDocByIdResponse: The result of the deletion operation.
1290
- - status="success": The document was successfully deleted.
1291
- - status="not_found": The document with the specified ID was not found.
1292
- - status="fail": The deletion operation failed.
1293
  - status="busy": The pipeline is busy with another operation.
 
1294
 
1295
  Raises:
1296
  HTTPException:
1297
- - 500: If an unexpected internal error occurs.
1298
  """
 
 
 
 
 
 
 
 
1299
  # The rag object is initialized from the server startup args,
1300
  # so we can access its properties here.
1301
  if not rag.enable_llm_cache_for_entity_extract:
1302
- raise HTTPException(
1303
- status_code=403,
1304
- detail="Operation not allowed when LLM cache for entity extraction is disabled.",
 
1305
  )
1306
- from lightrag.kg.shared_storage import (
1307
- get_namespace_data,
1308
- get_pipeline_status_lock,
1309
- )
1310
 
1311
- doc_id = delete_request.doc_id
1312
- pipeline_status = await get_namespace_data("pipeline_status")
1313
- pipeline_status_lock = get_pipeline_status_lock()
1314
 
1315
- async with pipeline_status_lock:
 
 
1316
  if pipeline_status.get("busy", False):
1317
  return DeleteDocByIdResponse(
1318
  status="busy",
1319
  message="Cannot delete document while pipeline is busy",
1320
  doc_id=doc_id,
1321
  )
1322
- pipeline_status.update(
1323
- {
1324
- "busy": True,
1325
- "job_name": f"Deleting Document: {doc_id}",
1326
- "job_start": datetime.now().isoformat(),
1327
- "latest_message": "Starting document deletion process",
1328
- }
1329
- )
1330
- # Use slice assignment to clear the list in place
1331
- pipeline_status["history_messages"][:] = [
1332
- f"Starting deletion for doc_id: {doc_id}"
1333
- ]
1334
-
1335
- try:
1336
- result = await rag.adelete_by_doc_id(doc_id)
1337
- if "history_messages" in pipeline_status:
1338
- pipeline_status["history_messages"].append(result.message)
1339
 
1340
- if result.status == "not_found":
1341
- raise HTTPException(status_code=404, detail=result.message)
1342
- if result.status == "fail":
1343
- raise HTTPException(status_code=500, detail=result.message)
1344
 
1345
  return DeleteDocByIdResponse(
1346
- doc_id=result.doc_id,
1347
- message=result.message,
1348
- status=result.status,
1349
  )
1350
 
1351
  except Exception as e:
1352
- error_msg = f"Error deleting document {doc_id}: {str(e)}"
1353
  logger.error(error_msg)
1354
  logger.error(traceback.format_exc())
1355
- if "history_messages" in pipeline_status:
1356
- pipeline_status["history_messages"].append(error_msg)
1357
- # Re-raise as HTTPException for consistent error handling by FastAPI
1358
  raise HTTPException(status_code=500, detail=error_msg)
1359
- finally:
1360
- async with pipeline_status_lock:
1361
- pipeline_status["busy"] = False
1362
- completion_msg = f"Document deletion process for {doc_id} completed."
1363
- pipeline_status["latest_message"] = completion_msg
1364
- if "history_messages" in pipeline_status:
1365
- pipeline_status["history_messages"].append(completion_msg)
1366
 
1367
  @router.post(
1368
  "/clear_cache",
 
782
  logger.error(traceback.format_exc())
783
 
784
 
785
+ async def background_delete_document(rag: LightRAG, doc_id: str):
786
+ """Background task to delete a document"""
787
+ from lightrag.kg.shared_storage import (
788
+ get_namespace_data,
789
+ get_pipeline_status_lock,
790
+ )
791
+
792
+ pipeline_status = await get_namespace_data("pipeline_status")
793
+ pipeline_status_lock = get_pipeline_status_lock()
794
+
795
+ # Set pipeline status to busy for deletion
796
+ async with pipeline_status_lock:
797
+ pipeline_status.update(
798
+ {
799
+ "busy": True,
800
+ "job_name": f"Deleting Document: {doc_id}",
801
+ "job_start": datetime.now().isoformat(),
802
+ "latest_message": "Starting document deletion process",
803
+ }
804
+ )
805
+ # Use slice assignment to clear the list in place
806
+ pipeline_status["history_messages"][:] = [
807
+ f"Starting deletion for doc_id: {doc_id}"
808
+ ]
809
+
810
+ try:
811
+ result = await rag.adelete_by_doc_id(doc_id)
812
+ if "history_messages" in pipeline_status:
813
+ pipeline_status["history_messages"].append(result.message)
814
+
815
+ logger.info(f"Document deletion completed for {doc_id}: {result.status}")
816
+
817
+ except Exception as e:
818
+ error_msg = f"Error deleting document {doc_id}: {str(e)}"
819
+ logger.error(error_msg)
820
+ logger.error(traceback.format_exc())
821
+ if "history_messages" in pipeline_status:
822
+ pipeline_status["history_messages"].append(error_msg)
823
+ finally:
824
+ async with pipeline_status_lock:
825
+ pipeline_status["busy"] = False
826
+ completion_msg = f"Document deletion process for {doc_id} completed."
827
+ pipeline_status["latest_message"] = completion_msg
828
+ if "history_messages" in pipeline_status:
829
+ pipeline_status["history_messages"].append(completion_msg)
830
+
831
+
832
  def create_document_routes(
833
  rag: LightRAG, doc_manager: DocumentManager, api_key: Optional[str] = None
834
  ):
835
+ # Check if doc_id exists in the system - add this check at the beginning
836
+ async def check_doc_id_exists(doc_id: str) -> bool:
837
+ """Check if document ID exists in the system"""
838
+ try:
839
+ doc_status = await rag.doc_status.get_by_id(doc_id)
840
+ return doc_status is not None
841
+ except Exception as e:
842
+ logger.error(f"Error checking doc_id existence: {str(e)}")
843
+ return False
844
+
845
  # Create combined auth dependency for document routes
846
  combined_auth = get_combined_auth_dependency(api_key)
847
 
 
1311
  class DeleteDocByIdResponse(BaseModel):
1312
  """Response model for single document deletion operation."""
1313
 
1314
+ status: Literal["deletion_started", "busy", "not_allowed"] = Field(
1315
  description="Status of the deletion operation"
1316
  )
1317
  message: str = Field(description="Message describing the operation result")
1318
+ doc_id: str = Field(description="The ID of the document to delete")
 
 
1319
 
1320
  @router.delete(
1321
  "/delete_document",
 
1324
  summary="Delete a document and all its associated data by its ID.",
1325
  )
1326
 
 
1327
  async def delete_document(
1328
  delete_request: DeleteDocRequest,
1329
+ background_tasks: BackgroundTasks,
1330
  ) -> DeleteDocByIdResponse:
1331
  """
1332
+ Delete a document and all its associated data by its ID using background processing.
1333
 
1334
  Deletes a specific document and all its associated data, including its status,
1335
  text chunks, vector embeddings, and any related graph data.
1336
+ The deletion process runs in the background to avoid blocking the client connection.
1337
  It is disabled when llm cache for entity extraction is disabled.
1338
 
1339
  This operation is irreversible and will interact with the pipeline status.
1340
 
1341
  Args:
1342
  delete_request (DeleteDocRequest): The request containing the document ID.
1343
+ background_tasks: FastAPI BackgroundTasks for async processing
1344
 
1345
  Returns:
1346
  DeleteDocByIdResponse: The result of the deletion operation.
1347
+ - status="deletion_started": The document deletion has been initiated in the background.
 
 
1348
  - status="busy": The pipeline is busy with another operation.
1349
+ - status="not_allowed": Operation not allowed when LLM cache for entity extraction is disabled.
1350
 
1351
  Raises:
1352
  HTTPException:
1353
+ - 500: If an unexpected internal error occurs during initialization.
1354
  """
1355
+ # Check if doc_id exists first - return error immediately if not found
1356
+ doc_id = delete_request.doc_id
1357
+ if not await check_doc_id_exists(doc_id):
1358
+ raise HTTPException(
1359
+ status_code=404,
1360
+ detail=f"Document {doc_id} not found."
1361
+ )
1362
+
1363
  # The rag object is initialized from the server startup args,
1364
  # so we can access its properties here.
1365
  if not rag.enable_llm_cache_for_entity_extract:
1366
+ return DeleteDocByIdResponse(
1367
+ status="not_allowed",
1368
+ message="Operation not allowed when LLM cache for entity extraction is disabled.",
1369
+ doc_id=delete_request.doc_id,
1370
  )
 
 
 
 
1371
 
1372
+ try:
1373
+ from lightrag.kg.shared_storage import get_namespace_data
 
1374
 
1375
+ pipeline_status = await get_namespace_data("pipeline_status")
1376
+
1377
+ # Check if pipeline is busy
1378
  if pipeline_status.get("busy", False):
1379
  return DeleteDocByIdResponse(
1380
  status="busy",
1381
  message="Cannot delete document while pipeline is busy",
1382
  doc_id=doc_id,
1383
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1384
 
1385
+ # Add deletion task to background tasks
1386
+ background_tasks.add_task(background_delete_document, rag, doc_id)
 
 
1387
 
1388
  return DeleteDocByIdResponse(
1389
+ status="deletion_started",
1390
+ message=f"Document deletion for '{doc_id}' has been initiated. Processing will continue in background.",
1391
+ doc_id=doc_id,
1392
  )
1393
 
1394
  except Exception as e:
1395
+ error_msg = f"Error initiating document deletion for {delete_request.doc_id}: {str(e)}"
1396
  logger.error(error_msg)
1397
  logger.error(traceback.format_exc())
 
 
 
1398
  raise HTTPException(status_code=500, detail=error_msg)
 
 
 
 
 
 
 
1399
 
1400
  @router.post(
1401
  "/clear_cache",