zrguo commited on
Commit
85fc9f9
·
unverified ·
2 Parent(s): 0a5a0dc 5799da7

Merge pull request #791 from ArnoChenFx/refactor-server

Browse files

Refactor File Indexing for Background Asynchronous Processing

lightrag/api/lightrag_server.py CHANGED
@@ -3,7 +3,6 @@ from fastapi import (
3
  HTTPException,
4
  File,
5
  UploadFile,
6
- Form,
7
  BackgroundTasks,
8
  )
9
  import asyncio
@@ -14,7 +13,7 @@ import re
14
  from fastapi.staticfiles import StaticFiles
15
  import logging
16
  import argparse
17
- from typing import List, Any, Optional, Union, Dict
18
  from pydantic import BaseModel
19
  from lightrag import LightRAG, QueryParam
20
  from lightrag.types import GPTKeywordExtractionFormat
@@ -34,6 +33,9 @@ from starlette.status import HTTP_403_FORBIDDEN
34
  import pipmaster as pm
35
  from dotenv import load_dotenv
36
  import configparser
 
 
 
37
  from lightrag.utils import logger
38
  from .ollama_api import (
39
  OllamaAPI,
@@ -635,9 +637,47 @@ class SearchMode(str, Enum):
635
 
636
  class QueryRequest(BaseModel):
637
  query: str
 
 
638
  mode: SearchMode = SearchMode.hybrid
639
- stream: bool = False
640
- only_need_context: bool = False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
641
 
642
 
643
  class QueryResponse(BaseModel):
@@ -646,13 +686,38 @@ class QueryResponse(BaseModel):
646
 
647
  class InsertTextRequest(BaseModel):
648
  text: str
649
- description: Optional[str] = None
650
 
651
 
652
  class InsertResponse(BaseModel):
653
  status: str
654
  message: str
655
- document_count: int
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
656
 
657
 
658
  def get_api_key_dependency(api_key: Optional[str]):
@@ -666,7 +731,9 @@ def get_api_key_dependency(api_key: Optional[str]):
666
  # If API key is configured, use proper authentication
667
  api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
668
 
669
- async def api_key_auth(api_key_header_value: str | None = Security(api_key_header)):
 
 
670
  if not api_key_header_value:
671
  raise HTTPException(
672
  status_code=HTTP_403_FORBIDDEN, detail="API Key required"
@@ -682,6 +749,7 @@ def get_api_key_dependency(api_key: Optional[str]):
682
 
683
  # Global configuration
684
  global_top_k = 60 # default value
 
685
 
686
 
687
  def create_app(args):
@@ -1132,79 +1200,162 @@ def create_app(args):
1132
  ("llm_response_cache", rag.llm_response_cache),
1133
  ]
1134
 
1135
- async def index_file(file_path: Union[str, Path]) -> None:
1136
- """Index all files inside the folder with support for multiple file formats
1137
 
1138
  Args:
1139
- file_path: Path to the file to be indexed (str or Path object)
1140
-
1141
- Raises:
1142
- ValueError: If file format is not supported
1143
- FileNotFoundError: If file doesn't exist
1144
  """
1145
- if not pm.is_installed("aiofiles"):
1146
- pm.install("aiofiles")
 
1147
 
1148
- # Convert to Path object if string
1149
- file_path = Path(file_path)
 
1150
 
1151
- # Check if file exists
1152
- if not file_path.exists():
1153
- raise FileNotFoundError(f"File not found: {file_path}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1154
 
1155
- content = ""
1156
- # Get file extension in lowercase
1157
- ext = file_path.suffix.lower()
 
 
 
 
 
 
 
 
 
 
1158
 
1159
- match ext:
1160
- case ".txt" | ".md":
1161
- # Text files handling
1162
- async with aiofiles.open(file_path, "r", encoding="utf-8") as f:
1163
- content = await f.read()
1164
 
1165
- case ".pdf" | ".docx" | ".pptx" | ".xlsx":
1166
- if not pm.is_installed("docling"):
1167
- pm.install("docling")
1168
- from docling.document_converter import DocumentConverter
 
 
1169
 
1170
- async def convert_doc():
1171
- def sync_convert():
1172
- converter = DocumentConverter()
1173
- result = converter.convert(file_path)
1174
- return result.document.export_to_markdown()
1175
 
1176
- return await asyncio.to_thread(sync_convert)
 
1177
 
1178
- content = await convert_doc()
 
 
 
 
 
 
1179
 
1180
- case _:
1181
- raise ValueError(f"Unsupported file format: {ext}")
 
 
 
1182
 
1183
- # Insert content into RAG system
1184
- if content:
1185
- await rag.ainsert(content)
1186
- doc_manager.mark_as_indexed(file_path)
1187
- logging.info(f"Successfully indexed file: {file_path}")
1188
- else:
1189
- logging.warning(f"No content extracted from file: {file_path}")
1190
 
1191
- @app.post("/documents/scan", dependencies=[Depends(optional_api_key)])
1192
- async def scan_for_new_documents(background_tasks: BackgroundTasks):
1193
- """Trigger the scanning process"""
1194
- global scan_progress
1195
 
1196
- with progress_lock:
1197
- if scan_progress["is_scanning"]:
1198
- return {"status": "already_scanning"}
 
 
 
 
1199
 
1200
- scan_progress["is_scanning"] = True
1201
- scan_progress["indexed_count"] = 0
1202
- scan_progress["progress"] = 0
1203
 
1204
- # Start the scanning process in the background
1205
- background_tasks.add_task(run_scanning_process)
1206
 
1207
- return {"status": "scanning_started"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1208
 
1209
  async def run_scanning_process():
1210
  """Background task to scan and index documents"""
@@ -1220,7 +1371,7 @@ def create_app(args):
1220
  with progress_lock:
1221
  scan_progress["current_file"] = os.path.basename(file_path)
1222
 
1223
- await index_file(file_path)
1224
 
1225
  with progress_lock:
1226
  scan_progress["indexed_count"] += 1
@@ -1238,6 +1389,24 @@ def create_app(args):
1238
  with progress_lock:
1239
  scan_progress["is_scanning"] = False
1240
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1241
  @app.get("/documents/scan-progress")
1242
  async def get_scan_progress():
1243
  """Get the current scanning progress"""
@@ -1245,7 +1414,9 @@ def create_app(args):
1245
  return scan_progress
1246
 
1247
  @app.post("/documents/upload", dependencies=[Depends(optional_api_key)])
1248
- async def upload_to_input_dir(file: UploadFile = File(...)):
 
 
1249
  """
1250
  Endpoint for uploading a file to the input directory and indexing it.
1251
 
@@ -1254,6 +1425,7 @@ def create_app(args):
1254
  indexes it for retrieval, and returns a success status with relevant details.
1255
 
1256
  Parameters:
 
1257
  file (UploadFile): The file to be uploaded. It must have an allowed extension as per
1258
  `doc_manager.supported_extensions`.
1259
 
@@ -1278,121 +1450,16 @@ def create_app(args):
1278
  with open(file_path, "wb") as buffer:
1279
  shutil.copyfileobj(file.file, buffer)
1280
 
1281
- # Immediately index the uploaded file
1282
- await index_file(file_path)
1283
-
1284
- return {
1285
- "status": "success",
1286
- "message": f"File uploaded and indexed: {file.filename}",
1287
- "total_documents": len(doc_manager.indexed_files),
1288
- }
1289
- except Exception as e:
1290
- raise HTTPException(status_code=500, detail=str(e))
1291
-
1292
- @app.post(
1293
- "/query", response_model=QueryResponse, dependencies=[Depends(optional_api_key)]
1294
- )
1295
- async def query_text(request: QueryRequest):
1296
- """
1297
- Handle a POST request at the /query endpoint to process user queries using RAG capabilities.
1298
-
1299
- Parameters:
1300
- request (QueryRequest): A Pydantic model containing the following fields:
1301
- - query (str): The text of the user's query.
1302
- - mode (ModeEnum): Optional. Specifies the mode of retrieval augmentation.
1303
- - stream (bool): Optional. Determines if the response should be streamed.
1304
- - only_need_context (bool): Optional. If true, returns only the context without further processing.
1305
-
1306
- Returns:
1307
- QueryResponse: A Pydantic model containing the result of the query processing.
1308
- If a string is returned (e.g., cache hit), it's directly returned.
1309
- Otherwise, an async generator may be used to build the response.
1310
-
1311
- Raises:
1312
- HTTPException: Raised when an error occurs during the request handling process,
1313
- with status code 500 and detail containing the exception message.
1314
- """
1315
- try:
1316
- response = await rag.aquery(
1317
- request.query,
1318
- param=QueryParam(
1319
- mode=request.mode,
1320
- stream=request.stream,
1321
- only_need_context=request.only_need_context,
1322
- top_k=global_top_k,
1323
- ),
1324
- )
1325
-
1326
- # If response is a string (e.g. cache hit), return directly
1327
- if isinstance(response, str):
1328
- return QueryResponse(response=response)
1329
-
1330
- # If it's an async generator, decide whether to stream based on stream parameter
1331
- if request.stream:
1332
- result = ""
1333
- async for chunk in response:
1334
- result += chunk
1335
- return QueryResponse(response=result)
1336
- else:
1337
- result = ""
1338
- async for chunk in response:
1339
- result += chunk
1340
- return QueryResponse(response=result)
1341
- except Exception as e:
1342
- trace_exception(e)
1343
- raise HTTPException(status_code=500, detail=str(e))
1344
-
1345
- @app.post("/query/stream", dependencies=[Depends(optional_api_key)])
1346
- async def query_text_stream(request: QueryRequest):
1347
- """
1348
- This endpoint performs a retrieval-augmented generation (RAG) query and streams the response.
1349
-
1350
- Args:
1351
- request (QueryRequest): The request object containing the query parameters.
1352
- optional_api_key (Optional[str], optional): An optional API key for authentication. Defaults to None.
1353
-
1354
- Returns:
1355
- StreamingResponse: A streaming response containing the RAG query results.
1356
- """
1357
- try:
1358
- response = await rag.aquery( # Use aquery instead of query, and add await
1359
- request.query,
1360
- param=QueryParam(
1361
- mode=request.mode,
1362
- stream=True,
1363
- only_need_context=request.only_need_context,
1364
- top_k=global_top_k,
1365
- ),
1366
- )
1367
-
1368
- from fastapi.responses import StreamingResponse
1369
-
1370
- async def stream_generator():
1371
- if isinstance(response, str):
1372
- # If it's a string, send it all at once
1373
- yield f"{json.dumps({'response': response})}\n"
1374
- else:
1375
- # If it's an async generator, send chunks one by one
1376
- try:
1377
- async for chunk in response:
1378
- if chunk: # Only send non-empty content
1379
- yield f"{json.dumps({'response': chunk})}\n"
1380
- except Exception as e:
1381
- logging.error(f"Streaming error: {str(e)}")
1382
- yield f"{json.dumps({'error': str(e)})}\n"
1383
 
1384
- return StreamingResponse(
1385
- stream_generator(),
1386
- media_type="application/x-ndjson",
1387
- headers={
1388
- "Cache-Control": "no-cache",
1389
- "Connection": "keep-alive",
1390
- "Content-Type": "application/x-ndjson",
1391
- "X-Accel-Buffering": "no", # 确保在Nginx代理时正确处理流式响应
1392
- },
1393
  )
1394
  except Exception as e:
1395
- trace_exception(e)
 
1396
  raise HTTPException(status_code=500, detail=str(e))
1397
 
1398
  @app.post(
@@ -1400,7 +1467,9 @@ def create_app(args):
1400
  response_model=InsertResponse,
1401
  dependencies=[Depends(optional_api_key)],
1402
  )
1403
- async def insert_text(request: InsertTextRequest):
 
 
1404
  """
1405
  Insert text into the Retrieval-Augmented Generation (RAG) system.
1406
 
@@ -1408,18 +1477,20 @@ def create_app(args):
1408
 
1409
  Args:
1410
  request (InsertTextRequest): The request body containing the text to be inserted.
 
1411
 
1412
  Returns:
1413
  InsertResponse: A response object containing the status of the operation, a message, and the number of documents inserted.
1414
  """
1415
  try:
1416
- await rag.ainsert(request.text)
1417
  return InsertResponse(
1418
  status="success",
1419
- message="Text successfully inserted",
1420
- document_count=1,
1421
  )
1422
  except Exception as e:
 
 
1423
  raise HTTPException(status_code=500, detail=str(e))
1424
 
1425
  @app.post(
@@ -1427,12 +1498,14 @@ def create_app(args):
1427
  response_model=InsertResponse,
1428
  dependencies=[Depends(optional_api_key)],
1429
  )
1430
- async def insert_file(file: UploadFile = File(...), description: str = Form(None)):
 
 
1431
  """Insert a file directly into the RAG system
1432
 
1433
  Args:
 
1434
  file: Uploaded file
1435
- description: Optional description of the file
1436
 
1437
  Returns:
1438
  InsertResponse: Status of the insertion operation
@@ -1441,68 +1514,26 @@ def create_app(args):
1441
  HTTPException: For unsupported file types or processing errors
1442
  """
1443
  try:
1444
- content = ""
1445
- # Get file extension in lowercase
1446
- ext = Path(file.filename).suffix.lower()
1447
-
1448
- match ext:
1449
- case ".txt" | ".md":
1450
- # Text files handling
1451
- text_content = await file.read()
1452
- content = text_content.decode("utf-8")
1453
-
1454
- case ".pdf" | ".docx" | ".pptx" | ".xlsx":
1455
- if not pm.is_installed("docling"):
1456
- pm.install("docling")
1457
- from docling.document_converter import DocumentConverter
1458
-
1459
- # Create a temporary file to save the uploaded content
1460
- temp_path = Path("temp") / file.filename
1461
- temp_path.parent.mkdir(exist_ok=True)
1462
-
1463
- # Save the uploaded file
1464
- with temp_path.open("wb") as f:
1465
- f.write(await file.read())
1466
-
1467
- try:
1468
-
1469
- async def convert_doc():
1470
- def sync_convert():
1471
- converter = DocumentConverter()
1472
- result = converter.convert(str(temp_path))
1473
- return result.document.export_to_markdown()
1474
-
1475
- return await asyncio.to_thread(sync_convert)
1476
-
1477
- content = await convert_doc()
1478
- finally:
1479
- # Clean up the temporary file
1480
- temp_path.unlink()
1481
-
1482
- # Insert content into RAG system
1483
- if content:
1484
- # Add description if provided
1485
- if description:
1486
- content = f"{description}\n\n{content}"
1487
-
1488
- await rag.ainsert(content)
1489
- logging.info(f"Successfully indexed file: {file.filename}")
1490
-
1491
- return InsertResponse(
1492
- status="success",
1493
- message=f"File '{file.filename}' successfully inserted",
1494
- document_count=1,
1495
- )
1496
- else:
1497
  raise HTTPException(
1498
  status_code=400,
1499
- detail="No content could be extracted from the file",
1500
  )
1501
 
1502
- except UnicodeDecodeError:
1503
- raise HTTPException(status_code=400, detail="File encoding not supported")
 
 
 
 
 
 
 
 
 
1504
  except Exception as e:
1505
- logging.error(f"Error processing file {file.filename}: {str(e)}")
 
1506
  raise HTTPException(status_code=500, detail=str(e))
1507
 
1508
  @app.post(
@@ -1510,10 +1541,13 @@ def create_app(args):
1510
  response_model=InsertResponse,
1511
  dependencies=[Depends(optional_api_key)],
1512
  )
1513
- async def insert_batch(files: List[UploadFile] = File(...)):
 
 
1514
  """Process multiple files in batch mode
1515
 
1516
  Args:
 
1517
  files: List of files to process
1518
 
1519
  Returns:
@@ -1525,72 +1559,18 @@ def create_app(args):
1525
  try:
1526
  inserted_count = 0
1527
  failed_files = []
 
1528
 
1529
  for file in files:
1530
- try:
1531
- content = ""
1532
- ext = Path(file.filename).suffix.lower()
1533
-
1534
- match ext:
1535
- case ".txt" | ".md":
1536
- text_content = await file.read()
1537
- content = text_content.decode("utf-8")
1538
-
1539
- case ".pdf":
1540
- if not pm.is_installed("pypdf2"):
1541
- pm.install("pypdf2")
1542
- from PyPDF2 import PdfReader
1543
- from io import BytesIO
1544
-
1545
- pdf_content = await file.read()
1546
- pdf_file = BytesIO(pdf_content)
1547
- reader = PdfReader(pdf_file)
1548
- for page in reader.pages:
1549
- content += page.extract_text() + "\n"
1550
-
1551
- case ".docx":
1552
- if not pm.is_installed("docx"):
1553
- pm.install("docx")
1554
- from docx import Document
1555
- from io import BytesIO
1556
-
1557
- docx_content = await file.read()
1558
- docx_file = BytesIO(docx_content)
1559
- doc = Document(docx_file)
1560
- content = "\n".join(
1561
- [paragraph.text for paragraph in doc.paragraphs]
1562
- )
1563
-
1564
- case ".pptx":
1565
- if not pm.is_installed("pptx"):
1566
- pm.install("pptx")
1567
- from pptx import Presentation # type: ignore
1568
- from io import BytesIO
1569
-
1570
- pptx_content = await file.read()
1571
- pptx_file = BytesIO(pptx_content)
1572
- prs = Presentation(pptx_file)
1573
- for slide in prs.slides:
1574
- for shape in slide.shapes:
1575
- if hasattr(shape, "text"):
1576
- content += shape.text + "\n"
1577
-
1578
- case _:
1579
- failed_files.append(f"{file.filename} (unsupported type)")
1580
- continue
1581
-
1582
- if content:
1583
- await rag.ainsert(content)
1584
- inserted_count += 1
1585
- logging.info(f"Successfully indexed file: {file.filename}")
1586
- else:
1587
- failed_files.append(f"{file.filename} (no content extracted)")
1588
 
1589
- except UnicodeDecodeError:
1590
- failed_files.append(f"{file.filename} (encoding error)")
1591
- except Exception as e:
1592
- failed_files.append(f"{file.filename} ({str(e)})")
1593
- logging.error(f"Error processing file {file.filename}: {str(e)}")
1594
 
1595
  # Prepare status message
1596
  if inserted_count == len(files):
@@ -1607,14 +1587,11 @@ def create_app(args):
1607
  if failed_files:
1608
  status_message += f". Failed files: {', '.join(failed_files)}"
1609
 
1610
- return InsertResponse(
1611
- status=status,
1612
- message=status_message,
1613
- document_count=inserted_count,
1614
- )
1615
 
1616
  except Exception as e:
1617
- logging.error(f"Batch processing error: {str(e)}")
 
1618
  raise HTTPException(status_code=500, detail=str(e))
1619
 
1620
  @app.delete(
@@ -1637,11 +1614,103 @@ def create_app(args):
1637
  rag.entities_vdb = None
1638
  rag.relationships_vdb = None
1639
  return InsertResponse(
1640
- status="success",
1641
- message="All documents cleared successfully",
1642
- document_count=0,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1643
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1644
  except Exception as e:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1645
  raise HTTPException(status_code=500, detail=str(e))
1646
 
1647
  # query all graph labels
 
3
  HTTPException,
4
  File,
5
  UploadFile,
 
6
  BackgroundTasks,
7
  )
8
  import asyncio
 
13
  from fastapi.staticfiles import StaticFiles
14
  import logging
15
  import argparse
16
+ from typing import List, Any, Optional, Dict
17
  from pydantic import BaseModel
18
  from lightrag import LightRAG, QueryParam
19
  from lightrag.types import GPTKeywordExtractionFormat
 
33
  import pipmaster as pm
34
  from dotenv import load_dotenv
35
  import configparser
36
+ import traceback
37
+ from datetime import datetime
38
+
39
  from lightrag.utils import logger
40
  from .ollama_api import (
41
  OllamaAPI,
 
637
 
638
  class QueryRequest(BaseModel):
639
  query: str
640
+
641
+ """Specifies the retrieval mode"""
642
  mode: SearchMode = SearchMode.hybrid
643
+
644
+ """If True, enables streaming output for real-time responses."""
645
+ stream: Optional[bool] = None
646
+
647
+ """If True, only returns the retrieved context without generating a response."""
648
+ only_need_context: Optional[bool] = None
649
+
650
+ """If True, only returns the generated prompt without producing a response."""
651
+ only_need_prompt: Optional[bool] = None
652
+
653
+ """Defines the response format. Examples: 'Multiple Paragraphs', 'Single Paragraph', 'Bullet Points'."""
654
+ response_type: Optional[str] = None
655
+
656
+ """Number of top items to retrieve. Represents entities in 'local' mode and relationships in 'global' mode."""
657
+ top_k: Optional[int] = None
658
+
659
+ """Maximum number of tokens allowed for each retrieved text chunk."""
660
+ max_token_for_text_unit: Optional[int] = None
661
+
662
+ """Maximum number of tokens allocated for relationship descriptions in global retrieval."""
663
+ max_token_for_global_context: Optional[int] = None
664
+
665
+ """Maximum number of tokens allocated for entity descriptions in local retrieval."""
666
+ max_token_for_local_context: Optional[int] = None
667
+
668
+ """List of high-level keywords to prioritize in retrieval."""
669
+ hl_keywords: Optional[List[str]] = None
670
+
671
+ """List of low-level keywords to refine retrieval focus."""
672
+ ll_keywords: Optional[List[str]] = None
673
+
674
+ """Stores past conversation history to maintain context.
675
+ Format: [{"role": "user/assistant", "content": "message"}].
676
+ """
677
+ conversation_history: Optional[List[dict[str, Any]]] = None
678
+
679
+ """Number of complete conversation turns (user-assistant pairs) to consider in the response context."""
680
+ history_turns: Optional[int] = None
681
 
682
 
683
  class QueryResponse(BaseModel):
 
686
 
687
  class InsertTextRequest(BaseModel):
688
  text: str
 
689
 
690
 
691
  class InsertResponse(BaseModel):
692
  status: str
693
  message: str
694
+
695
+
696
+ def QueryRequestToQueryParams(request: QueryRequest):
697
+ param = QueryParam(mode=request.mode, stream=request.stream)
698
+ if request.only_need_context is not None:
699
+ param.only_need_context = request.only_need_context
700
+ if request.only_need_prompt is not None:
701
+ param.only_need_prompt = request.only_need_prompt
702
+ if request.response_type is not None:
703
+ param.response_type = request.response_type
704
+ if request.top_k is not None:
705
+ param.top_k = request.top_k
706
+ if request.max_token_for_text_unit is not None:
707
+ param.max_token_for_text_unit = request.max_token_for_text_unit
708
+ if request.max_token_for_global_context is not None:
709
+ param.max_token_for_global_context = request.max_token_for_global_context
710
+ if request.max_token_for_local_context is not None:
711
+ param.max_token_for_local_context = request.max_token_for_local_context
712
+ if request.hl_keywords is not None:
713
+ param.hl_keywords = request.hl_keywords
714
+ if request.ll_keywords is not None:
715
+ param.ll_keywords = request.ll_keywords
716
+ if request.conversation_history is not None:
717
+ param.conversation_history = request.conversation_history
718
+ if request.history_turns is not None:
719
+ param.history_turns = request.history_turns
720
+ return param
721
 
722
 
723
  def get_api_key_dependency(api_key: Optional[str]):
 
731
  # If API key is configured, use proper authentication
732
  api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
733
 
734
+ async def api_key_auth(
735
+ api_key_header_value: Optional[str] = Security(api_key_header),
736
+ ):
737
  if not api_key_header_value:
738
  raise HTTPException(
739
  status_code=HTTP_403_FORBIDDEN, detail="API Key required"
 
749
 
750
  # Global configuration
751
  global_top_k = 60 # default value
752
+ temp_prefix = "__tmp_" # prefix for temporary files
753
 
754
 
755
  def create_app(args):
 
1200
  ("llm_response_cache", rag.llm_response_cache),
1201
  ]
1202
 
1203
+ async def pipeline_enqueue_file(file_path: Path) -> bool:
1204
+ """Add a file to the queue for processing
1205
 
1206
  Args:
1207
+ file_path: Path to the saved file
1208
+ Returns:
1209
+ bool: True if the file was successfully enqueued, False otherwise
 
 
1210
  """
1211
+ try:
1212
+ content = ""
1213
+ ext = file_path.suffix.lower()
1214
 
1215
+ file = None
1216
+ async with aiofiles.open(file_path, "rb") as f:
1217
+ file = await f.read()
1218
 
1219
+ # Process based on file type
1220
+ match ext:
1221
+ case ".txt" | ".md":
1222
+ content = file.decode("utf-8")
1223
+ case ".pdf":
1224
+ if not pm.is_installed("pypdf2"):
1225
+ pm.install("pypdf2")
1226
+ from PyPDF2 import PdfReader
1227
+ from io import BytesIO
1228
+
1229
+ pdf_file = BytesIO(file)
1230
+ reader = PdfReader(pdf_file)
1231
+ for page in reader.pages:
1232
+ content += page.extract_text() + "\n"
1233
+ case ".docx":
1234
+ if not pm.is_installed("docx"):
1235
+ pm.install("docx")
1236
+ from docx import Document
1237
+ from io import BytesIO
1238
+
1239
+ docx_content = await file.read()
1240
+ docx_file = BytesIO(docx_content)
1241
+ doc = Document(docx_file)
1242
+ content = "\n".join(
1243
+ [paragraph.text for paragraph in doc.paragraphs]
1244
+ )
1245
+ case ".pptx":
1246
+ if not pm.is_installed("pptx"):
1247
+ pm.install("pptx")
1248
+ from pptx import Presentation # type: ignore
1249
+ from io import BytesIO
1250
+
1251
+ pptx_content = await file.read()
1252
+ pptx_file = BytesIO(pptx_content)
1253
+ prs = Presentation(pptx_file)
1254
+ for slide in prs.slides:
1255
+ for shape in slide.shapes:
1256
+ if hasattr(shape, "text"):
1257
+ content += shape.text + "\n"
1258
+ case _:
1259
+ logging.error(
1260
+ f"Unsupported file type: {file_path.name} (extension {ext})"
1261
+ )
1262
+ return False
1263
+
1264
+ # Insert into the RAG queue
1265
+ if content:
1266
+ await rag.apipeline_enqueue_documents(content)
1267
+ logging.info(
1268
+ f"Successfully processed and enqueued file: {file_path.name}"
1269
+ )
1270
+ return True
1271
+ else:
1272
+ logging.error(
1273
+ f"No content could be extracted from file: {file_path.name}"
1274
+ )
1275
 
1276
+ except Exception as e:
1277
+ logging.error(
1278
+ f"Error processing or enqueueing file {file_path.name}: {str(e)}"
1279
+ )
1280
+ logging.error(traceback.format_exc())
1281
+ finally:
1282
+ if file_path.name.startswith(temp_prefix):
1283
+ # Clean up the temporary file after indexing
1284
+ try:
1285
+ file_path.unlink()
1286
+ except Exception as e:
1287
+ logging.error(f"Error deleting file {file_path}: {str(e)}")
1288
+ return False
1289
 
1290
+ async def pipeline_index_file(file_path: Path):
1291
+ """Index a file
 
 
 
1292
 
1293
+ Args:
1294
+ file_path: Path to the saved file
1295
+ """
1296
+ try:
1297
+ if await pipeline_enqueue_file(file_path):
1298
+ await rag.apipeline_process_enqueue_documents()
1299
 
1300
+ except Exception as e:
1301
+ logging.error(f"Error indexing file {file_path.name}: {str(e)}")
1302
+ logging.error(traceback.format_exc())
 
 
1303
 
1304
+ async def pipeline_index_files(file_paths: List[Path]):
1305
+ """Index multiple files concurrently
1306
 
1307
+ Args:
1308
+ file_paths: Paths to the files to index
1309
+ """
1310
+ if not file_paths:
1311
+ return
1312
+ try:
1313
+ enqueued = False
1314
 
1315
+ if len(file_paths) == 1:
1316
+ enqueued = await pipeline_enqueue_file(file_paths[0])
1317
+ else:
1318
+ tasks = [pipeline_enqueue_file(path) for path in file_paths]
1319
+ enqueued = any(await asyncio.gather(*tasks))
1320
 
1321
+ if enqueued:
1322
+ await rag.apipeline_process_enqueue_documents()
1323
+ except Exception as e:
1324
+ logging.error(f"Error indexing files: {str(e)}")
1325
+ logging.error(traceback.format_exc())
 
 
1326
 
1327
+ async def pipeline_index_texts(texts: List[str]):
1328
+ """Index a list of texts
 
 
1329
 
1330
+ Args:
1331
+ texts: The texts to index
1332
+ """
1333
+ if not texts:
1334
+ return
1335
+ await rag.apipeline_enqueue_documents(texts)
1336
+ await rag.apipeline_process_enqueue_documents()
1337
 
1338
+ async def save_temp_file(file: UploadFile = File(...)) -> Path:
1339
+ """Save the uploaded file to a temporary location
 
1340
 
1341
+ Args:
1342
+ file: The uploaded file
1343
 
1344
+ Returns:
1345
+ Path: The path to the saved file
1346
+ """
1347
+ # Generate unique filename to avoid conflicts
1348
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
1349
+ unique_filename = f"{temp_prefix}{timestamp}_{file.filename}"
1350
+
1351
+ # Create a temporary file to save the uploaded content
1352
+ temp_path = doc_manager.input_dir / "temp" / unique_filename
1353
+ temp_path.parent.mkdir(exist_ok=True)
1354
+
1355
+ # Save the file
1356
+ with open(temp_path, "wb") as buffer:
1357
+ shutil.copyfileobj(file.file, buffer)
1358
+ return temp_path
1359
 
1360
  async def run_scanning_process():
1361
  """Background task to scan and index documents"""
 
1371
  with progress_lock:
1372
  scan_progress["current_file"] = os.path.basename(file_path)
1373
 
1374
+ await pipeline_index_file(file_path)
1375
 
1376
  with progress_lock:
1377
  scan_progress["indexed_count"] += 1
 
1389
  with progress_lock:
1390
  scan_progress["is_scanning"] = False
1391
 
1392
+ @app.post("/documents/scan", dependencies=[Depends(optional_api_key)])
1393
+ async def scan_for_new_documents(background_tasks: BackgroundTasks):
1394
+ """Trigger the scanning process"""
1395
+ global scan_progress
1396
+
1397
+ with progress_lock:
1398
+ if scan_progress["is_scanning"]:
1399
+ return {"status": "already_scanning"}
1400
+
1401
+ scan_progress["is_scanning"] = True
1402
+ scan_progress["indexed_count"] = 0
1403
+ scan_progress["progress"] = 0
1404
+
1405
+ # Start the scanning process in the background
1406
+ background_tasks.add_task(run_scanning_process)
1407
+
1408
+ return {"status": "scanning_started"}
1409
+
1410
  @app.get("/documents/scan-progress")
1411
  async def get_scan_progress():
1412
  """Get the current scanning progress"""
 
1414
  return scan_progress
1415
 
1416
  @app.post("/documents/upload", dependencies=[Depends(optional_api_key)])
1417
+ async def upload_to_input_dir(
1418
+ background_tasks: BackgroundTasks, file: UploadFile = File(...)
1419
+ ):
1420
  """
1421
  Endpoint for uploading a file to the input directory and indexing it.
1422
 
 
1425
  indexes it for retrieval, and returns a success status with relevant details.
1426
 
1427
  Parameters:
1428
+ background_tasks: FastAPI BackgroundTasks for async processing
1429
  file (UploadFile): The file to be uploaded. It must have an allowed extension as per
1430
  `doc_manager.supported_extensions`.
1431
 
 
1450
  with open(file_path, "wb") as buffer:
1451
  shutil.copyfileobj(file.file, buffer)
1452
 
1453
+ # Add to background tasks
1454
+ background_tasks.add_task(pipeline_index_file, file_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1455
 
1456
+ return InsertResponse(
1457
+ status="success",
1458
+ message=f"File '{file.filename}' uploaded successfully. Processing will continue in background.",
 
 
 
 
 
 
1459
  )
1460
  except Exception as e:
1461
+ logging.error(f"Error /documents/upload: {file.filename}: {str(e)}")
1462
+ logging.error(traceback.format_exc())
1463
  raise HTTPException(status_code=500, detail=str(e))
1464
 
1465
  @app.post(
 
1467
  response_model=InsertResponse,
1468
  dependencies=[Depends(optional_api_key)],
1469
  )
1470
+ async def insert_text(
1471
+ request: InsertTextRequest, background_tasks: BackgroundTasks
1472
+ ):
1473
  """
1474
  Insert text into the Retrieval-Augmented Generation (RAG) system.
1475
 
 
1477
 
1478
  Args:
1479
  request (InsertTextRequest): The request body containing the text to be inserted.
1480
+ background_tasks: FastAPI BackgroundTasks for async processing
1481
 
1482
  Returns:
1483
  InsertResponse: A response object containing the status of the operation, a message, and the number of documents inserted.
1484
  """
1485
  try:
1486
+ background_tasks.add_task(pipeline_index_texts, [request.text])
1487
  return InsertResponse(
1488
  status="success",
1489
+ message="Text successfully received. Processing will continue in background.",
 
1490
  )
1491
  except Exception as e:
1492
+ logging.error(f"Error /documents/text: {str(e)}")
1493
+ logging.error(traceback.format_exc())
1494
  raise HTTPException(status_code=500, detail=str(e))
1495
 
1496
  @app.post(
 
1498
  response_model=InsertResponse,
1499
  dependencies=[Depends(optional_api_key)],
1500
  )
1501
+ async def insert_file(
1502
+ background_tasks: BackgroundTasks, file: UploadFile = File(...)
1503
+ ):
1504
  """Insert a file directly into the RAG system
1505
 
1506
  Args:
1507
+ background_tasks: FastAPI BackgroundTasks for async processing
1508
  file: Uploaded file
 
1509
 
1510
  Returns:
1511
  InsertResponse: Status of the insertion operation
 
1514
  HTTPException: For unsupported file types or processing errors
1515
  """
1516
  try:
1517
+ if not doc_manager.is_supported_file(file.filename):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1518
  raise HTTPException(
1519
  status_code=400,
1520
+ detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}",
1521
  )
1522
 
1523
+ # Create a temporary file to save the uploaded content
1524
+ temp_path = save_temp_file(file)
1525
+
1526
+ # Add to background tasks
1527
+ background_tasks.add_task(pipeline_index_file, temp_path)
1528
+
1529
+ return InsertResponse(
1530
+ status="success",
1531
+ message=f"File '{file.filename}' saved successfully. Processing will continue in background.",
1532
+ )
1533
+
1534
  except Exception as e:
1535
+ logging.error(f"Error /documents/file: {str(e)}")
1536
+ logging.error(traceback.format_exc())
1537
  raise HTTPException(status_code=500, detail=str(e))
1538
 
1539
  @app.post(
 
1541
  response_model=InsertResponse,
1542
  dependencies=[Depends(optional_api_key)],
1543
  )
1544
+ async def insert_batch(
1545
+ background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)
1546
+ ):
1547
  """Process multiple files in batch mode
1548
 
1549
  Args:
1550
+ background_tasks: FastAPI BackgroundTasks for async processing
1551
  files: List of files to process
1552
 
1553
  Returns:
 
1559
  try:
1560
  inserted_count = 0
1561
  failed_files = []
1562
+ temp_files = []
1563
 
1564
  for file in files:
1565
+ if doc_manager.is_supported_file(file.filename):
1566
+ # Create a temporary file to save the uploaded content
1567
+ temp_files.append(save_temp_file(file))
1568
+ inserted_count += 1
1569
+ else:
1570
+ failed_files.append(f"{file.filename} (unsupported type)")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1571
 
1572
+ if temp_files:
1573
+ background_tasks.add_task(pipeline_index_files, temp_files)
 
 
 
1574
 
1575
  # Prepare status message
1576
  if inserted_count == len(files):
 
1587
  if failed_files:
1588
  status_message += f". Failed files: {', '.join(failed_files)}"
1589
 
1590
+ return InsertResponse(status=status, message=status_message)
 
 
 
 
1591
 
1592
  except Exception as e:
1593
+ logging.error(f"Error /documents/batch: {file.filename}: {str(e)}")
1594
+ logging.error(traceback.format_exc())
1595
  raise HTTPException(status_code=500, detail=str(e))
1596
 
1597
  @app.delete(
 
1614
  rag.entities_vdb = None
1615
  rag.relationships_vdb = None
1616
  return InsertResponse(
1617
+ status="success", message="All documents cleared successfully"
1618
+ )
1619
+ except Exception as e:
1620
+ logging.error(f"Error DELETE /documents: {str(e)}")
1621
+ logging.error(traceback.format_exc())
1622
+ raise HTTPException(status_code=500, detail=str(e))
1623
+
1624
+ @app.post(
1625
+ "/query", response_model=QueryResponse, dependencies=[Depends(optional_api_key)]
1626
+ )
1627
+ async def query_text(request: QueryRequest):
1628
+ """
1629
+ Handle a POST request at the /query endpoint to process user queries using RAG capabilities.
1630
+
1631
+ Parameters:
1632
+ request (QueryRequest): The request object containing the query parameters.
1633
+ Returns:
1634
+ QueryResponse: A Pydantic model containing the result of the query processing.
1635
+ If a string is returned (e.g., cache hit), it's directly returned.
1636
+ Otherwise, an async generator may be used to build the response.
1637
+
1638
+ Raises:
1639
+ HTTPException: Raised when an error occurs during the request handling process,
1640
+ with status code 500 and detail containing the exception message.
1641
+ """
1642
+ try:
1643
+ response = await rag.aquery(
1644
+ request.query, param=QueryRequestToQueryParams(request)
1645
  )
1646
+
1647
+ # If response is a string (e.g. cache hit), return directly
1648
+ if isinstance(response, str):
1649
+ return QueryResponse(response=response)
1650
+
1651
+ # If it's an async generator, decide whether to stream based on stream parameter
1652
+ if request.stream or hasattr(response, "__aiter__"):
1653
+ result = ""
1654
+ async for chunk in response:
1655
+ result += chunk
1656
+ return QueryResponse(response=result)
1657
+ elif isinstance(response, dict):
1658
+ result = json.dumps(response, indent=2)
1659
+ return QueryResponse(response=result)
1660
+ else:
1661
+ return QueryResponse(response=str(response))
1662
  except Exception as e:
1663
+ trace_exception(e)
1664
+ raise HTTPException(status_code=500, detail=str(e))
1665
+
1666
+ @app.post("/query/stream", dependencies=[Depends(optional_api_key)])
1667
+ async def query_text_stream(request: QueryRequest):
1668
+ """
1669
+ This endpoint performs a retrieval-augmented generation (RAG) query and streams the response.
1670
+
1671
+ Args:
1672
+ request (QueryRequest): The request object containing the query parameters.
1673
+ optional_api_key (Optional[str], optional): An optional API key for authentication. Defaults to None.
1674
+
1675
+ Returns:
1676
+ StreamingResponse: A streaming response containing the RAG query results.
1677
+ """
1678
+ try:
1679
+ params = QueryRequestToQueryParams(request)
1680
+
1681
+ params.stream = True
1682
+ response = await rag.aquery( # Use aquery instead of query, and add await
1683
+ request.query, param=params
1684
+ )
1685
+
1686
+ from fastapi.responses import StreamingResponse
1687
+
1688
+ async def stream_generator():
1689
+ if isinstance(response, str):
1690
+ # If it's a string, send it all at once
1691
+ yield f"{json.dumps({'response': response})}\n"
1692
+ else:
1693
+ # If it's an async generator, send chunks one by one
1694
+ try:
1695
+ async for chunk in response:
1696
+ if chunk: # Only send non-empty content
1697
+ yield f"{json.dumps({'response': chunk})}\n"
1698
+ except Exception as e:
1699
+ logging.error(f"Streaming error: {str(e)}")
1700
+ yield f"{json.dumps({'error': str(e)})}\n"
1701
+
1702
+ return StreamingResponse(
1703
+ stream_generator(),
1704
+ media_type="application/x-ndjson",
1705
+ headers={
1706
+ "Cache-Control": "no-cache",
1707
+ "Connection": "keep-alive",
1708
+ "Content-Type": "application/x-ndjson",
1709
+ "X-Accel-Buffering": "no", # 确保在Nginx代理时正确处理流式响应
1710
+ },
1711
+ )
1712
+ except Exception as e:
1713
+ trace_exception(e)
1714
  raise HTTPException(status_code=500, detail=str(e))
1715
 
1716
  # query all graph labels
lightrag/base.py CHANGED
@@ -249,20 +249,10 @@ class DocStatusStorage(BaseKVStorage):
249
  """Get counts of documents in each status"""
250
  raise NotImplementedError
251
 
252
- async def get_failed_docs(self) -> dict[str, DocProcessingStatus]:
253
- """Get all failed documents"""
254
- raise NotImplementedError
255
-
256
- async def get_pending_docs(self) -> dict[str, DocProcessingStatus]:
257
- """Get all pending documents"""
258
- raise NotImplementedError
259
-
260
- async def get_processing_docs(self) -> dict[str, DocProcessingStatus]:
261
- """Get all processing documents"""
262
- raise NotImplementedError
263
-
264
- async def get_processed_docs(self) -> dict[str, DocProcessingStatus]:
265
- """Get all procesed documents"""
266
  raise NotImplementedError
267
 
268
  async def update_doc_status(self, data: dict[str, Any]) -> None:
 
249
  """Get counts of documents in each status"""
250
  raise NotImplementedError
251
 
252
+ async def get_docs_by_status(
253
+ self, status: DocStatus
254
+ ) -> dict[str, DocProcessingStatus]:
255
+ """Get all documents with a specific status"""
 
 
 
 
 
 
 
 
 
 
256
  raise NotImplementedError
257
 
258
  async def update_doc_status(self, data: dict[str, Any]) -> None:
lightrag/kg/json_doc_status_impl.py CHANGED
@@ -93,36 +93,14 @@ class JsonDocStatusStorage(DocStatusStorage):
93
  counts[doc["status"]] += 1
94
  return counts
95
 
96
- async def get_failed_docs(self) -> dict[str, DocProcessingStatus]:
97
- """Get all failed documents"""
 
 
98
  return {
99
  k: DocProcessingStatus(**v)
100
  for k, v in self._data.items()
101
- if v["status"] == DocStatus.FAILED
102
- }
103
-
104
- async def get_pending_docs(self) -> dict[str, DocProcessingStatus]:
105
- """Get all pending documents"""
106
- return {
107
- k: DocProcessingStatus(**v)
108
- for k, v in self._data.items()
109
- if v["status"] == DocStatus.PENDING
110
- }
111
-
112
- async def get_processed_docs(self) -> dict[str, DocProcessingStatus]:
113
- """Get all processed documents"""
114
- return {
115
- k: DocProcessingStatus(**v)
116
- for k, v in self._data.items()
117
- if v["status"] == DocStatus.PROCESSED
118
- }
119
-
120
- async def get_processing_docs(self) -> dict[str, DocProcessingStatus]:
121
- """Get all processing documents"""
122
- return {
123
- k: DocProcessingStatus(**v)
124
- for k, v in self._data.items()
125
- if v["status"] == DocStatus.PROCESSING
126
  }
127
 
128
  async def index_done_callback(self):
 
93
  counts[doc["status"]] += 1
94
  return counts
95
 
96
+ async def get_docs_by_status(
97
+ self, status: DocStatus
98
+ ) -> dict[str, DocProcessingStatus]:
99
+ """all documents with a specific status"""
100
  return {
101
  k: DocProcessingStatus(**v)
102
  for k, v in self._data.items()
103
+ if v["status"] == status
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  }
105
 
106
  async def index_done_callback(self):
lightrag/kg/mongo_impl.py CHANGED
@@ -175,7 +175,7 @@ class MongoDocStatusStorage(DocStatusStorage):
175
  async def get_docs_by_status(
176
  self, status: DocStatus
177
  ) -> dict[str, DocProcessingStatus]:
178
- """Get all documents by status"""
179
  cursor = self._data.find({"status": status.value})
180
  result = await cursor.to_list()
181
  return {
@@ -191,22 +191,6 @@ class MongoDocStatusStorage(DocStatusStorage):
191
  for doc in result
192
  }
193
 
194
- async def get_failed_docs(self) -> dict[str, DocProcessingStatus]:
195
- """Get all failed documents"""
196
- return await self.get_docs_by_status(DocStatus.FAILED)
197
-
198
- async def get_pending_docs(self) -> dict[str, DocProcessingStatus]:
199
- """Get all pending documents"""
200
- return await self.get_docs_by_status(DocStatus.PENDING)
201
-
202
- async def get_processing_docs(self) -> dict[str, DocProcessingStatus]:
203
- """Get all processing documents"""
204
- return await self.get_docs_by_status(DocStatus.PROCESSING)
205
-
206
- async def get_processed_docs(self) -> dict[str, DocProcessingStatus]:
207
- """Get all procesed documents"""
208
- return await self.get_docs_by_status(DocStatus.PROCESSED)
209
-
210
 
211
  @dataclass
212
  class MongoGraphStorage(BaseGraphStorage):
 
175
  async def get_docs_by_status(
176
  self, status: DocStatus
177
  ) -> dict[str, DocProcessingStatus]:
178
+ """Get all documents with a specific status"""
179
  cursor = self._data.find({"status": status.value})
180
  result = await cursor.to_list()
181
  return {
 
191
  for doc in result
192
  }
193
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
194
 
195
  @dataclass
196
  class MongoGraphStorage(BaseGraphStorage):
lightrag/kg/postgres_impl.py CHANGED
@@ -468,7 +468,7 @@ class PGDocStatusStorage(DocStatusStorage):
468
  async def get_docs_by_status(
469
  self, status: DocStatus
470
  ) -> Dict[str, DocProcessingStatus]:
471
- """Get all documents by status"""
472
  sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
473
  params = {"workspace": self.db.workspace, "status": status}
474
  result = await self.db.query(sql, params, True)
@@ -485,22 +485,6 @@ class PGDocStatusStorage(DocStatusStorage):
485
  for element in result
486
  }
487
 
488
- async def get_failed_docs(self) -> Dict[str, DocProcessingStatus]:
489
- """Get all failed documents"""
490
- return await self.get_docs_by_status(DocStatus.FAILED)
491
-
492
- async def get_pending_docs(self) -> Dict[str, DocProcessingStatus]:
493
- """Get all pending documents"""
494
- return await self.get_docs_by_status(DocStatus.PENDING)
495
-
496
- async def get_processing_docs(self) -> dict[str, DocProcessingStatus]:
497
- """Get all processing documents"""
498
- return await self.get_docs_by_status(DocStatus.PROCESSING)
499
-
500
- async def get_processed_docs(self) -> dict[str, DocProcessingStatus]:
501
- """Get all procesed documents"""
502
- return await self.get_docs_by_status(DocStatus.PROCESSED)
503
-
504
  async def index_done_callback(self):
505
  """Save data after indexing, but for PostgreSQL, we already saved them during the upsert stage, so no action to take here"""
506
  logger.info("Doc status had been saved into postgresql db!")
 
468
  async def get_docs_by_status(
469
  self, status: DocStatus
470
  ) -> Dict[str, DocProcessingStatus]:
471
+ """all documents with a specific status"""
472
  sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
473
  params = {"workspace": self.db.workspace, "status": status}
474
  result = await self.db.query(sql, params, True)
 
485
  for element in result
486
  }
487
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
488
  async def index_done_callback(self):
489
  """Save data after indexing, but for PostgreSQL, we already saved them during the upsert stage, so no action to take here"""
490
  logger.info("Doc status had been saved into postgresql db!")
lightrag/lightrag.py CHANGED
@@ -89,7 +89,7 @@ STORAGE_IMPLEMENTATIONS = {
89
  "PGDocStatusStorage",
90
  "MongoDocStatusStorage",
91
  ],
92
- "required_methods": ["get_pending_docs"],
93
  },
94
  }
95
 
@@ -230,7 +230,7 @@ class LightRAG:
230
  """LightRAG: Simple and Fast Retrieval-Augmented Generation."""
231
 
232
  working_dir: str = field(
233
- default_factory=lambda: f'./lightrag_cache_{datetime.now().strftime("%Y-%m-%d-%H:%M:%S")}'
234
  )
235
  """Directory where cache and temporary files are stored."""
236
 
@@ -715,11 +715,11 @@ class LightRAG:
715
  # 1. Get all pending, failed, and abnormally terminated processing documents.
716
  to_process_docs: dict[str, DocProcessingStatus] = {}
717
 
718
- processing_docs = await self.doc_status.get_processing_docs()
719
  to_process_docs.update(processing_docs)
720
- failed_docs = await self.doc_status.get_failed_docs()
721
  to_process_docs.update(failed_docs)
722
- pendings_docs = await self.doc_status.get_pending_docs()
723
  to_process_docs.update(pendings_docs)
724
 
725
  if not to_process_docs:
 
89
  "PGDocStatusStorage",
90
  "MongoDocStatusStorage",
91
  ],
92
+ "required_methods": ["get_docs_by_status"],
93
  },
94
  }
95
 
 
230
  """LightRAG: Simple and Fast Retrieval-Augmented Generation."""
231
 
232
  working_dir: str = field(
233
+ default_factory=lambda: f"./lightrag_cache_{datetime.now().strftime('%Y-%m-%d-%H:%M:%S')}"
234
  )
235
  """Directory where cache and temporary files are stored."""
236
 
 
715
  # 1. Get all pending, failed, and abnormally terminated processing documents.
716
  to_process_docs: dict[str, DocProcessingStatus] = {}
717
 
718
+ processing_docs = await self.doc_status.get_docs_by_status(DocStatus.PROCESSING)
719
  to_process_docs.update(processing_docs)
720
+ failed_docs = await self.doc_status.get_docs_by_status(DocStatus.FAILED)
721
  to_process_docs.update(failed_docs)
722
+ pendings_docs = await self.doc_status.get_docs_by_status(DocStatus.PENDING)
723
  to_process_docs.update(pendings_docs)
724
 
725
  if not to_process_docs: