yangdx
commited on
Commit
·
0edbfbb
1
Parent(s):
103da4d
Fix linting
Browse files- gunicorn_config.py +43 -45
- lightrag/api/lightrag_server.py +9 -7
- lightrag/api/routers/document_routes.py +7 -7
- lightrag/api/utils_api.py +3 -1
- lightrag/lightrag.py +44 -38
- lightrag/operate.py +2 -1
- lightrag/utils.py +12 -20
- run_with_gunicorn.py +15 -10
gunicorn_config.py
CHANGED
@@ -28,58 +28,55 @@ keepalive = 5
|
|
28 |
errorlog = os.getenv("ERROR_LOG", log_file_path) # 默认写入到 lightrag.log
|
29 |
accesslog = os.getenv("ACCESS_LOG", log_file_path) # 默认写入到 lightrag.log
|
30 |
|
31 |
-
# 配置日志系统
|
32 |
logconfig_dict = {
|
33 |
-
|
34 |
-
|
35 |
-
|
36 |
-
|
37 |
-
'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
|
38 |
-
},
|
39 |
},
|
40 |
-
|
41 |
-
|
42 |
-
|
43 |
-
|
44 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
45 |
},
|
46 |
-
'file': {
|
47 |
-
'class': 'logging.handlers.RotatingFileHandler',
|
48 |
-
'formatter': 'standard',
|
49 |
-
'filename': log_file_path,
|
50 |
-
'maxBytes': 10485760, # 10MB
|
51 |
-
'backupCount': 5,
|
52 |
-
'encoding': 'utf8'
|
53 |
-
}
|
54 |
},
|
55 |
-
|
56 |
-
|
57 |
-
|
58 |
},
|
59 |
},
|
60 |
-
|
61 |
-
|
62 |
-
|
63 |
-
|
64 |
-
|
65 |
},
|
66 |
-
|
67 |
-
|
68 |
-
|
69 |
-
|
70 |
},
|
71 |
-
|
72 |
-
|
73 |
-
|
74 |
-
|
75 |
},
|
76 |
-
|
77 |
-
|
78 |
-
|
79 |
-
|
80 |
-
|
81 |
-
}
|
82 |
-
}
|
83 |
}
|
84 |
|
85 |
|
@@ -134,14 +131,15 @@ def post_fork(server, worker):
|
|
134 |
"""
|
135 |
# Set lightrag logger level in worker processes using gunicorn's loglevel
|
136 |
from lightrag.utils import logger
|
|
|
137 |
logger.setLevel(loglevel.upper())
|
138 |
-
|
139 |
# Disable uvicorn.error logger in worker processes
|
140 |
uvicorn_error_logger = logging.getLogger("uvicorn.error")
|
141 |
uvicorn_error_logger.setLevel(logging.CRITICAL)
|
142 |
uvicorn_error_logger.handlers = []
|
143 |
uvicorn_error_logger.propagate = False
|
144 |
-
|
145 |
# Add log filter to uvicorn.access handler in worker processes
|
146 |
uvicorn_access_logger = logging.getLogger("uvicorn.access")
|
147 |
path_filter = LightragPathFilter()
|
|
|
28 |
errorlog = os.getenv("ERROR_LOG", log_file_path) # 默认写入到 lightrag.log
|
29 |
accesslog = os.getenv("ACCESS_LOG", log_file_path) # 默认写入到 lightrag.log
|
30 |
|
|
|
31 |
logconfig_dict = {
|
32 |
+
"version": 1,
|
33 |
+
"disable_existing_loggers": False,
|
34 |
+
"formatters": {
|
35 |
+
"standard": {"format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s"},
|
|
|
|
|
36 |
},
|
37 |
+
"handlers": {
|
38 |
+
"console": {
|
39 |
+
"class": "logging.StreamHandler",
|
40 |
+
"formatter": "standard",
|
41 |
+
"stream": "ext://sys.stdout",
|
42 |
+
},
|
43 |
+
"file": {
|
44 |
+
"class": "logging.handlers.RotatingFileHandler",
|
45 |
+
"formatter": "standard",
|
46 |
+
"filename": log_file_path,
|
47 |
+
"maxBytes": 10485760, # 10MB
|
48 |
+
"backupCount": 5,
|
49 |
+
"encoding": "utf8",
|
50 |
},
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
51 |
},
|
52 |
+
"filters": {
|
53 |
+
"path_filter": {
|
54 |
+
"()": "lightrag.api.lightrag_server.LightragPathFilter",
|
55 |
},
|
56 |
},
|
57 |
+
"loggers": {
|
58 |
+
"lightrag": {
|
59 |
+
"handlers": ["console", "file"],
|
60 |
+
"level": loglevel.upper() if loglevel else "INFO",
|
61 |
+
"propagate": False,
|
62 |
},
|
63 |
+
"gunicorn": {
|
64 |
+
"handlers": ["console", "file"],
|
65 |
+
"level": loglevel.upper() if loglevel else "INFO",
|
66 |
+
"propagate": False,
|
67 |
},
|
68 |
+
"gunicorn.error": {
|
69 |
+
"handlers": ["console", "file"],
|
70 |
+
"level": loglevel.upper() if loglevel else "INFO",
|
71 |
+
"propagate": False,
|
72 |
},
|
73 |
+
"gunicorn.access": {
|
74 |
+
"handlers": ["console", "file"],
|
75 |
+
"level": loglevel.upper() if loglevel else "INFO",
|
76 |
+
"propagate": False,
|
77 |
+
"filters": ["path_filter"],
|
78 |
+
},
|
79 |
+
},
|
80 |
}
|
81 |
|
82 |
|
|
|
131 |
"""
|
132 |
# Set lightrag logger level in worker processes using gunicorn's loglevel
|
133 |
from lightrag.utils import logger
|
134 |
+
|
135 |
logger.setLevel(loglevel.upper())
|
136 |
+
|
137 |
# Disable uvicorn.error logger in worker processes
|
138 |
uvicorn_error_logger = logging.getLogger("uvicorn.error")
|
139 |
uvicorn_error_logger.setLevel(logging.CRITICAL)
|
140 |
uvicorn_error_logger.handlers = []
|
141 |
uvicorn_error_logger.propagate = False
|
142 |
+
|
143 |
# Add log filter to uvicorn.access handler in worker processes
|
144 |
uvicorn_access_logger = logging.getLogger("uvicorn.access")
|
145 |
path_filter = LightragPathFilter()
|
lightrag/api/lightrag_server.py
CHANGED
@@ -9,7 +9,6 @@ from fastapi import (
|
|
9 |
from fastapi.responses import FileResponse
|
10 |
import asyncio
|
11 |
import os
|
12 |
-
import json
|
13 |
import logging
|
14 |
import logging.config
|
15 |
import uvicorn
|
@@ -139,17 +138,20 @@ def create_app(args):
|
|
139 |
# Auto scan documents if enabled
|
140 |
if args.auto_scan_at_startup:
|
141 |
# Import necessary functions from shared_storage
|
142 |
-
from lightrag.kg.shared_storage import
|
143 |
-
|
|
|
|
|
|
|
144 |
# Get pipeline status and lock
|
145 |
pipeline_status = get_namespace_data("pipeline_status")
|
146 |
storage_lock = get_storage_lock()
|
147 |
-
|
148 |
# Check if a task is already running (with lock protection)
|
149 |
should_start_task = False
|
150 |
with storage_lock:
|
151 |
if not pipeline_status.get("busy", False):
|
152 |
-
should_start_task = True
|
153 |
# Only start the task if no other task is running
|
154 |
if should_start_task:
|
155 |
# Create background task
|
@@ -430,7 +432,7 @@ def configure_logging():
|
|
430 |
|
431 |
# Configure basic logging
|
432 |
log_file_path = os.path.abspath(os.path.join(os.getcwd(), "lightrag.log"))
|
433 |
-
|
434 |
logging.config.dictConfig(
|
435 |
{
|
436 |
"version": 1,
|
@@ -453,7 +455,7 @@ def configure_logging():
|
|
453 |
"formatter": "detailed",
|
454 |
"class": "logging.handlers.RotatingFileHandler",
|
455 |
"filename": log_file_path,
|
456 |
-
"maxBytes": 10*1024*1024, # 10MB
|
457 |
"backupCount": 5,
|
458 |
"encoding": "utf-8",
|
459 |
},
|
|
|
9 |
from fastapi.responses import FileResponse
|
10 |
import asyncio
|
11 |
import os
|
|
|
12 |
import logging
|
13 |
import logging.config
|
14 |
import uvicorn
|
|
|
138 |
# Auto scan documents if enabled
|
139 |
if args.auto_scan_at_startup:
|
140 |
# Import necessary functions from shared_storage
|
141 |
+
from lightrag.kg.shared_storage import (
|
142 |
+
get_namespace_data,
|
143 |
+
get_storage_lock,
|
144 |
+
)
|
145 |
+
|
146 |
# Get pipeline status and lock
|
147 |
pipeline_status = get_namespace_data("pipeline_status")
|
148 |
storage_lock = get_storage_lock()
|
149 |
+
|
150 |
# Check if a task is already running (with lock protection)
|
151 |
should_start_task = False
|
152 |
with storage_lock:
|
153 |
if not pipeline_status.get("busy", False):
|
154 |
+
should_start_task = True
|
155 |
# Only start the task if no other task is running
|
156 |
if should_start_task:
|
157 |
# Create background task
|
|
|
432 |
|
433 |
# Configure basic logging
|
434 |
log_file_path = os.path.abspath(os.path.join(os.getcwd(), "lightrag.log"))
|
435 |
+
|
436 |
logging.config.dictConfig(
|
437 |
{
|
438 |
"version": 1,
|
|
|
455 |
"formatter": "detailed",
|
456 |
"class": "logging.handlers.RotatingFileHandler",
|
457 |
"filename": log_file_path,
|
458 |
+
"maxBytes": 10 * 1024 * 1024, # 10MB
|
459 |
"backupCount": 5,
|
460 |
"encoding": "utf-8",
|
461 |
},
|
lightrag/api/routers/document_routes.py
CHANGED
@@ -406,7 +406,6 @@ def create_document_routes(
|
|
406 |
background_tasks.add_task(run_scanning_process, rag, doc_manager)
|
407 |
return {"status": "scanning_started"}
|
408 |
|
409 |
-
|
410 |
@router.post("/upload", dependencies=[Depends(optional_api_key)])
|
411 |
async def upload_to_input_dir(
|
412 |
background_tasks: BackgroundTasks, file: UploadFile = File(...)
|
@@ -657,29 +656,30 @@ def create_document_routes(
|
|
657 |
async def get_pipeline_status():
|
658 |
"""
|
659 |
Get the current status of the document indexing pipeline.
|
660 |
-
|
661 |
This endpoint returns information about the current state of the document processing pipeline,
|
662 |
including whether it's busy, the current job name, when it started, how many documents
|
663 |
are being processed, how many batches there are, and which batch is currently being processed.
|
664 |
-
|
665 |
Returns:
|
666 |
dict: A dictionary containing the pipeline status information
|
667 |
"""
|
668 |
try:
|
669 |
from lightrag.kg.shared_storage import get_namespace_data
|
|
|
670 |
pipeline_status = get_namespace_data("pipeline_status")
|
671 |
-
|
672 |
# Convert to regular dict if it's a Manager.dict
|
673 |
status_dict = dict(pipeline_status)
|
674 |
-
|
675 |
# Convert history_messages to a regular list if it's a Manager.list
|
676 |
if "history_messages" in status_dict:
|
677 |
status_dict["history_messages"] = list(status_dict["history_messages"])
|
678 |
-
|
679 |
# Format the job_start time if it exists
|
680 |
if status_dict.get("job_start"):
|
681 |
status_dict["job_start"] = str(status_dict["job_start"])
|
682 |
-
|
683 |
return status_dict
|
684 |
except Exception as e:
|
685 |
logger.error(f"Error getting pipeline status: {str(e)}")
|
|
|
406 |
background_tasks.add_task(run_scanning_process, rag, doc_manager)
|
407 |
return {"status": "scanning_started"}
|
408 |
|
|
|
409 |
@router.post("/upload", dependencies=[Depends(optional_api_key)])
|
410 |
async def upload_to_input_dir(
|
411 |
background_tasks: BackgroundTasks, file: UploadFile = File(...)
|
|
|
656 |
async def get_pipeline_status():
|
657 |
"""
|
658 |
Get the current status of the document indexing pipeline.
|
659 |
+
|
660 |
This endpoint returns information about the current state of the document processing pipeline,
|
661 |
including whether it's busy, the current job name, when it started, how many documents
|
662 |
are being processed, how many batches there are, and which batch is currently being processed.
|
663 |
+
|
664 |
Returns:
|
665 |
dict: A dictionary containing the pipeline status information
|
666 |
"""
|
667 |
try:
|
668 |
from lightrag.kg.shared_storage import get_namespace_data
|
669 |
+
|
670 |
pipeline_status = get_namespace_data("pipeline_status")
|
671 |
+
|
672 |
# Convert to regular dict if it's a Manager.dict
|
673 |
status_dict = dict(pipeline_status)
|
674 |
+
|
675 |
# Convert history_messages to a regular list if it's a Manager.list
|
676 |
if "history_messages" in status_dict:
|
677 |
status_dict["history_messages"] = list(status_dict["history_messages"])
|
678 |
+
|
679 |
# Format the job_start time if it exists
|
680 |
if status_dict.get("job_start"):
|
681 |
status_dict["job_start"] = str(status_dict["job_start"])
|
682 |
+
|
683 |
return status_dict
|
684 |
except Exception as e:
|
685 |
logger.error(f"Error getting pipeline status: {str(e)}")
|
lightrag/api/utils_api.py
CHANGED
@@ -295,7 +295,9 @@ def parse_args(is_uvicorn_mode: bool = False) -> argparse.Namespace:
|
|
295 |
original_workers = args.workers
|
296 |
args.workers = 1
|
297 |
# Log warning directly here
|
298 |
-
logging.warning(
|
|
|
|
|
299 |
|
300 |
# convert relative path to absolute path
|
301 |
args.working_dir = os.path.abspath(args.working_dir)
|
|
|
295 |
original_workers = args.workers
|
296 |
args.workers = 1
|
297 |
# Log warning directly here
|
298 |
+
logging.warning(
|
299 |
+
f"In uvicorn mode, workers parameter was set to {original_workers}. Forcing workers=1"
|
300 |
+
)
|
301 |
|
302 |
# convert relative path to absolute path
|
303 |
args.working_dir = os.path.abspath(args.working_dir)
|
lightrag/lightrag.py
CHANGED
@@ -274,6 +274,7 @@ class LightRAG:
|
|
274 |
from lightrag.kg.shared_storage import (
|
275 |
initialize_share_data,
|
276 |
)
|
|
|
277 |
initialize_share_data()
|
278 |
|
279 |
if not os.path.exists(self.working_dir):
|
@@ -671,44 +672,45 @@ class LightRAG:
|
|
671 |
4. Update the document status
|
672 |
"""
|
673 |
from lightrag.kg.shared_storage import get_namespace_data, get_storage_lock
|
674 |
-
|
675 |
# Get pipeline status shared data and lock
|
676 |
pipeline_status = get_namespace_data("pipeline_status")
|
677 |
storage_lock = get_storage_lock()
|
678 |
-
|
679 |
# Check if another process is already processing the queue
|
680 |
process_documents = False
|
681 |
with storage_lock:
|
|
|
682 |
if not pipeline_status.get("busy", False):
|
683 |
-
#
|
684 |
-
# 获取当前的 history_messages 列表
|
685 |
current_history = pipeline_status.get("history_messages", [])
|
686 |
-
|
687 |
-
# 清空当前列表内容但保持同一个列表对象
|
688 |
if hasattr(current_history, "clear"):
|
689 |
current_history.clear()
|
690 |
-
|
691 |
-
pipeline_status.update(
|
692 |
-
|
693 |
-
|
694 |
-
|
695 |
-
|
696 |
-
|
697 |
-
|
698 |
-
|
699 |
-
|
700 |
-
|
701 |
-
|
702 |
-
|
|
|
703 |
process_documents = True
|
704 |
else:
|
705 |
# Another process is busy, just set request flag and return
|
706 |
pipeline_status["request_pending"] = True
|
707 |
-
logger.info(
|
708 |
-
|
|
|
|
|
709 |
if not process_documents:
|
710 |
return
|
711 |
-
|
712 |
try:
|
713 |
# Process documents until no more documents or requests
|
714 |
while True:
|
@@ -734,7 +736,7 @@ class LightRAG:
|
|
734 |
# Update pipeline status with document count (with lock)
|
735 |
with storage_lock:
|
736 |
pipeline_status["docs"] = len(to_process_docs)
|
737 |
-
|
738 |
# 2. split docs into chunks, insert chunks, update doc status
|
739 |
docs_batches = [
|
740 |
list(to_process_docs.items())[i : i + self.max_parallel_insert]
|
@@ -742,11 +744,8 @@ class LightRAG:
|
|
742 |
]
|
743 |
|
744 |
# Update pipeline status with batch information (directly, as it's atomic)
|
745 |
-
pipeline_status.update({
|
746 |
-
|
747 |
-
"cur_batch": 0
|
748 |
-
})
|
749 |
-
|
750 |
log_message = f"Number of batches to process: {len(docs_batches)}."
|
751 |
logger.info(log_message)
|
752 |
pipeline_status["latest_message"] = log_message
|
@@ -757,13 +756,15 @@ class LightRAG:
|
|
757 |
for batch_idx, docs_batch in enumerate(docs_batches):
|
758 |
# Update current batch in pipeline status (directly, as it's atomic)
|
759 |
pipeline_status["cur_batch"] = batch_idx + 1
|
760 |
-
|
761 |
async def batch(
|
762 |
batch_idx: int,
|
763 |
docs_batch: list[tuple[str, DocProcessingStatus]],
|
764 |
size_batch: int,
|
765 |
) -> None:
|
766 |
-
log_message =
|
|
|
|
|
767 |
logger.info(log_message)
|
768 |
pipeline_status["latest_message"] = log_message
|
769 |
pipeline_status["history_messages"].append(log_message)
|
@@ -822,7 +823,9 @@ class LightRAG:
|
|
822 |
}
|
823 |
)
|
824 |
except Exception as e:
|
825 |
-
logger.error(
|
|
|
|
|
826 |
await self.doc_status.upsert(
|
827 |
{
|
828 |
doc_id: {
|
@@ -837,7 +840,9 @@ class LightRAG:
|
|
837 |
}
|
838 |
)
|
839 |
continue
|
840 |
-
log_message =
|
|
|
|
|
841 |
logger.info(log_message)
|
842 |
pipeline_status["latest_message"] = log_message
|
843 |
pipeline_status["history_messages"].append(log_message)
|
@@ -846,7 +851,7 @@ class LightRAG:
|
|
846 |
|
847 |
await asyncio.gather(*batches)
|
848 |
await self._insert_done()
|
849 |
-
|
850 |
# Check if there's a pending request to process more documents (with lock)
|
851 |
has_pending_request = False
|
852 |
with storage_lock:
|
@@ -854,15 +859,15 @@ class LightRAG:
|
|
854 |
if has_pending_request:
|
855 |
# Clear the request flag before checking for more documents
|
856 |
pipeline_status["request_pending"] = False
|
857 |
-
|
858 |
if not has_pending_request:
|
859 |
break
|
860 |
-
|
861 |
log_message = "Processing additional documents due to pending request"
|
862 |
logger.info(log_message)
|
863 |
pipeline_status["latest_message"] = log_message
|
864 |
pipeline_status["history_messages"].append(log_message)
|
865 |
-
|
866 |
finally:
|
867 |
# Always reset busy status when done or if an exception occurs (with lock)
|
868 |
with storage_lock:
|
@@ -901,12 +906,13 @@ class LightRAG:
|
|
901 |
if storage_inst is not None
|
902 |
]
|
903 |
await asyncio.gather(*tasks)
|
904 |
-
|
905 |
log_message = "All Insert done"
|
906 |
logger.info(log_message)
|
907 |
-
|
908 |
# 获取 pipeline_status 并更新 latest_message 和 history_messages
|
909 |
from lightrag.kg.shared_storage import get_namespace_data
|
|
|
910 |
pipeline_status = get_namespace_data("pipeline_status")
|
911 |
pipeline_status["latest_message"] = log_message
|
912 |
pipeline_status["history_messages"].append(log_message)
|
|
|
274 |
from lightrag.kg.shared_storage import (
|
275 |
initialize_share_data,
|
276 |
)
|
277 |
+
|
278 |
initialize_share_data()
|
279 |
|
280 |
if not os.path.exists(self.working_dir):
|
|
|
672 |
4. Update the document status
|
673 |
"""
|
674 |
from lightrag.kg.shared_storage import get_namespace_data, get_storage_lock
|
675 |
+
|
676 |
# Get pipeline status shared data and lock
|
677 |
pipeline_status = get_namespace_data("pipeline_status")
|
678 |
storage_lock = get_storage_lock()
|
679 |
+
|
680 |
# Check if another process is already processing the queue
|
681 |
process_documents = False
|
682 |
with storage_lock:
|
683 |
+
# Ensure only one worker is processing documents
|
684 |
if not pipeline_status.get("busy", False):
|
685 |
+
# Cleaning history_messages without breaking it as a shared list object
|
|
|
686 |
current_history = pipeline_status.get("history_messages", [])
|
|
|
|
|
687 |
if hasattr(current_history, "clear"):
|
688 |
current_history.clear()
|
689 |
+
|
690 |
+
pipeline_status.update(
|
691 |
+
{
|
692 |
+
"busy": True,
|
693 |
+
"job_name": "indexing files",
|
694 |
+
"job_start": datetime.now().isoformat(),
|
695 |
+
"docs": 0,
|
696 |
+
"batchs": 0,
|
697 |
+
"cur_batch": 0,
|
698 |
+
"request_pending": False, # Clear any previous request
|
699 |
+
"latest_message": "",
|
700 |
+
"history_messages": current_history, # keep it as a shared list object
|
701 |
+
}
|
702 |
+
)
|
703 |
process_documents = True
|
704 |
else:
|
705 |
# Another process is busy, just set request flag and return
|
706 |
pipeline_status["request_pending"] = True
|
707 |
+
logger.info(
|
708 |
+
"Another process is already processing the document queue. Request queued."
|
709 |
+
)
|
710 |
+
|
711 |
if not process_documents:
|
712 |
return
|
713 |
+
|
714 |
try:
|
715 |
# Process documents until no more documents or requests
|
716 |
while True:
|
|
|
736 |
# Update pipeline status with document count (with lock)
|
737 |
with storage_lock:
|
738 |
pipeline_status["docs"] = len(to_process_docs)
|
739 |
+
|
740 |
# 2. split docs into chunks, insert chunks, update doc status
|
741 |
docs_batches = [
|
742 |
list(to_process_docs.items())[i : i + self.max_parallel_insert]
|
|
|
744 |
]
|
745 |
|
746 |
# Update pipeline status with batch information (directly, as it's atomic)
|
747 |
+
pipeline_status.update({"batchs": len(docs_batches), "cur_batch": 0})
|
748 |
+
|
|
|
|
|
|
|
749 |
log_message = f"Number of batches to process: {len(docs_batches)}."
|
750 |
logger.info(log_message)
|
751 |
pipeline_status["latest_message"] = log_message
|
|
|
756 |
for batch_idx, docs_batch in enumerate(docs_batches):
|
757 |
# Update current batch in pipeline status (directly, as it's atomic)
|
758 |
pipeline_status["cur_batch"] = batch_idx + 1
|
759 |
+
|
760 |
async def batch(
|
761 |
batch_idx: int,
|
762 |
docs_batch: list[tuple[str, DocProcessingStatus]],
|
763 |
size_batch: int,
|
764 |
) -> None:
|
765 |
+
log_message = (
|
766 |
+
f"Start processing batch {batch_idx + 1} of {size_batch}."
|
767 |
+
)
|
768 |
logger.info(log_message)
|
769 |
pipeline_status["latest_message"] = log_message
|
770 |
pipeline_status["history_messages"].append(log_message)
|
|
|
823 |
}
|
824 |
)
|
825 |
except Exception as e:
|
826 |
+
logger.error(
|
827 |
+
f"Failed to process document {doc_id}: {str(e)}"
|
828 |
+
)
|
829 |
await self.doc_status.upsert(
|
830 |
{
|
831 |
doc_id: {
|
|
|
840 |
}
|
841 |
)
|
842 |
continue
|
843 |
+
log_message = (
|
844 |
+
f"Completed batch {batch_idx + 1} of {len(docs_batches)}."
|
845 |
+
)
|
846 |
logger.info(log_message)
|
847 |
pipeline_status["latest_message"] = log_message
|
848 |
pipeline_status["history_messages"].append(log_message)
|
|
|
851 |
|
852 |
await asyncio.gather(*batches)
|
853 |
await self._insert_done()
|
854 |
+
|
855 |
# Check if there's a pending request to process more documents (with lock)
|
856 |
has_pending_request = False
|
857 |
with storage_lock:
|
|
|
859 |
if has_pending_request:
|
860 |
# Clear the request flag before checking for more documents
|
861 |
pipeline_status["request_pending"] = False
|
862 |
+
|
863 |
if not has_pending_request:
|
864 |
break
|
865 |
+
|
866 |
log_message = "Processing additional documents due to pending request"
|
867 |
logger.info(log_message)
|
868 |
pipeline_status["latest_message"] = log_message
|
869 |
pipeline_status["history_messages"].append(log_message)
|
870 |
+
|
871 |
finally:
|
872 |
# Always reset busy status when done or if an exception occurs (with lock)
|
873 |
with storage_lock:
|
|
|
906 |
if storage_inst is not None
|
907 |
]
|
908 |
await asyncio.gather(*tasks)
|
909 |
+
|
910 |
log_message = "All Insert done"
|
911 |
logger.info(log_message)
|
912 |
+
|
913 |
# 获取 pipeline_status 并更新 latest_message 和 history_messages
|
914 |
from lightrag.kg.shared_storage import get_namespace_data
|
915 |
+
|
916 |
pipeline_status = get_namespace_data("pipeline_status")
|
917 |
pipeline_status["latest_message"] = log_message
|
918 |
pipeline_status["history_messages"].append(log_message)
|
lightrag/operate.py
CHANGED
@@ -336,8 +336,9 @@ async def extract_entities(
|
|
336 |
global_config: dict[str, str],
|
337 |
llm_response_cache: BaseKVStorage | None = None,
|
338 |
) -> None:
|
339 |
-
|
340 |
from lightrag.kg.shared_storage import get_namespace_data
|
|
|
341 |
pipeline_status = get_namespace_data("pipeline_status")
|
342 |
use_llm_func: callable = global_config["llm_model_func"]
|
343 |
entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
|
|
|
336 |
global_config: dict[str, str],
|
337 |
llm_response_cache: BaseKVStorage | None = None,
|
338 |
) -> None:
|
339 |
+
|
340 |
from lightrag.kg.shared_storage import get_namespace_data
|
341 |
+
|
342 |
pipeline_status = get_namespace_data("pipeline_status")
|
343 |
use_llm_func: callable = global_config["llm_model_func"]
|
344 |
entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
|
lightrag/utils.py
CHANGED
@@ -75,50 +75,42 @@ def set_logger(log_file: str, level: int = logging.DEBUG):
|
|
75 |
log_file: Path to the log file
|
76 |
level: Logging level (e.g. logging.DEBUG, logging.INFO)
|
77 |
"""
|
78 |
-
|
79 |
logger.setLevel(level)
|
80 |
-
|
81 |
-
# 确保使用绝对路径
|
82 |
log_file = os.path.abspath(log_file)
|
83 |
-
|
84 |
-
# 创建格式化器
|
85 |
formatter = logging.Formatter(
|
86 |
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
87 |
)
|
88 |
-
|
89 |
-
# 检查是否已经有文件处理器
|
90 |
has_file_handler = False
|
91 |
has_console_handler = False
|
92 |
-
|
93 |
-
# 检查现有处理器
|
94 |
for handler in logger.handlers:
|
95 |
if isinstance(handler, logging.FileHandler):
|
96 |
has_file_handler = True
|
97 |
-
elif isinstance(handler, logging.StreamHandler) and not isinstance(
|
|
|
|
|
98 |
has_console_handler = True
|
99 |
-
|
100 |
-
# 如果没有文件处理器,添加一个
|
101 |
if not has_file_handler:
|
102 |
-
# 使用 RotatingFileHandler 代替 FileHandler
|
103 |
from logging.handlers import RotatingFileHandler
|
|
|
104 |
file_handler = RotatingFileHandler(
|
105 |
-
log_file,
|
106 |
-
maxBytes=10*1024*1024, # 10MB
|
107 |
backupCount=5,
|
108 |
-
encoding="utf-8"
|
109 |
)
|
110 |
file_handler.setLevel(level)
|
111 |
file_handler.setFormatter(formatter)
|
112 |
logger.addHandler(file_handler)
|
113 |
-
|
114 |
-
# 如果没有控制台处理器,添加一个
|
115 |
if not has_console_handler:
|
116 |
console_handler = logging.StreamHandler()
|
117 |
console_handler.setLevel(level)
|
118 |
console_handler.setFormatter(formatter)
|
119 |
logger.addHandler(console_handler)
|
120 |
-
|
121 |
-
# 设置日志传播为 False,避免重复输出
|
122 |
logger.propagate = False
|
123 |
|
124 |
|
|
|
75 |
log_file: Path to the log file
|
76 |
level: Logging level (e.g. logging.DEBUG, logging.INFO)
|
77 |
"""
|
78 |
+
|
79 |
logger.setLevel(level)
|
|
|
|
|
80 |
log_file = os.path.abspath(log_file)
|
|
|
|
|
81 |
formatter = logging.Formatter(
|
82 |
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
83 |
)
|
|
|
|
|
84 |
has_file_handler = False
|
85 |
has_console_handler = False
|
86 |
+
|
|
|
87 |
for handler in logger.handlers:
|
88 |
if isinstance(handler, logging.FileHandler):
|
89 |
has_file_handler = True
|
90 |
+
elif isinstance(handler, logging.StreamHandler) and not isinstance(
|
91 |
+
handler, logging.FileHandler
|
92 |
+
):
|
93 |
has_console_handler = True
|
94 |
+
|
|
|
95 |
if not has_file_handler:
|
|
|
96 |
from logging.handlers import RotatingFileHandler
|
97 |
+
|
98 |
file_handler = RotatingFileHandler(
|
99 |
+
log_file,
|
100 |
+
maxBytes=10 * 1024 * 1024, # 10MB
|
101 |
backupCount=5,
|
102 |
+
encoding="utf-8",
|
103 |
)
|
104 |
file_handler.setLevel(level)
|
105 |
file_handler.setFormatter(formatter)
|
106 |
logger.addHandler(file_handler)
|
107 |
+
|
|
|
108 |
if not has_console_handler:
|
109 |
console_handler = logging.StreamHandler()
|
110 |
console_handler.setLevel(level)
|
111 |
console_handler.setFormatter(formatter)
|
112 |
logger.addHandler(console_handler)
|
113 |
+
|
|
|
114 |
logger.propagate = False
|
115 |
|
116 |
|
run_with_gunicorn.py
CHANGED
@@ -5,9 +5,7 @@ Start LightRAG server with Gunicorn
|
|
5 |
|
6 |
import os
|
7 |
import sys
|
8 |
-
import json
|
9 |
import signal
|
10 |
-
import argparse
|
11 |
from lightrag.api.utils_api import parse_args, display_splash_screen
|
12 |
from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data
|
13 |
|
@@ -34,7 +32,6 @@ def main():
|
|
34 |
# Parse all arguments using parse_args
|
35 |
args = parse_args(is_uvicorn_mode=False)
|
36 |
|
37 |
-
|
38 |
# Display startup information
|
39 |
display_splash_screen(args)
|
40 |
|
@@ -101,9 +98,15 @@ def main():
|
|
101 |
|
102 |
# Set configuration variables in gunicorn_config
|
103 |
gunicorn_config.workers = int(os.getenv("WORKERS", args.workers))
|
104 |
-
gunicorn_config.bind =
|
105 |
-
|
106 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
107 |
# Set SSL configuration if enabled
|
108 |
if args.ssl:
|
109 |
gunicorn_config.certfile = args.ssl_certfile
|
@@ -121,10 +124,12 @@ def main():
|
|
121 |
value = getattr(gunicorn_config, key)
|
122 |
if callable(value):
|
123 |
self.cfg.set(key, value)
|
124 |
-
|
125 |
-
|
126 |
-
if hasattr(gunicorn_config,
|
127 |
-
self.cfg.set(
|
|
|
|
|
128 |
|
129 |
def load(self):
|
130 |
# Import the application
|
|
|
5 |
|
6 |
import os
|
7 |
import sys
|
|
|
8 |
import signal
|
|
|
9 |
from lightrag.api.utils_api import parse_args, display_splash_screen
|
10 |
from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data
|
11 |
|
|
|
32 |
# Parse all arguments using parse_args
|
33 |
args = parse_args(is_uvicorn_mode=False)
|
34 |
|
|
|
35 |
# Display startup information
|
36 |
display_splash_screen(args)
|
37 |
|
|
|
98 |
|
99 |
# Set configuration variables in gunicorn_config
|
100 |
gunicorn_config.workers = int(os.getenv("WORKERS", args.workers))
|
101 |
+
gunicorn_config.bind = (
|
102 |
+
f"{os.getenv('HOST', args.host)}:{os.getenv('PORT', args.port)}"
|
103 |
+
)
|
104 |
+
gunicorn_config.loglevel = (
|
105 |
+
args.log_level.lower()
|
106 |
+
if args.log_level
|
107 |
+
else os.getenv("LOG_LEVEL", "info")
|
108 |
+
)
|
109 |
+
|
110 |
# Set SSL configuration if enabled
|
111 |
if args.ssl:
|
112 |
gunicorn_config.certfile = args.ssl_certfile
|
|
|
124 |
value = getattr(gunicorn_config, key)
|
125 |
if callable(value):
|
126 |
self.cfg.set(key, value)
|
127 |
+
|
128 |
+
|
129 |
+
if hasattr(gunicorn_config, "logconfig_dict"):
|
130 |
+
self.cfg.set(
|
131 |
+
"logconfig_dict", getattr(gunicorn_config, "logconfig_dict")
|
132 |
+
)
|
133 |
|
134 |
def load(self):
|
135 |
# Import the application
|