yangdx commited on
Commit
e0ddaf2
·
1 Parent(s): 76d72c8

Implement Gunicorn+Uvicorn integration for shared data preloading

Browse files

- Create run_with_gunicorn.py script to properly initialize shared data in the
main process before forking worker processes
- Revert unvicorn to single process mode only, and let gunicorn do all the multi-process jobs

gunicorn_config.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # gunicorn_config.py
2
+ import os
3
+ import multiprocessing
4
+ from lightrag.kg.shared_storage import finalize_share_data
5
+ from lightrag.api.utils_api import parse_args
6
+
7
+ # Parse command line arguments
8
+ args = parse_args()
9
+
10
+ # Determine worker count - from environment variable or command line arguments
11
+ workers = int(os.getenv('WORKERS', args.workers))
12
+
13
+ # If not specified, use CPU count * 2 + 1 (Gunicorn recommended configuration)
14
+ if workers <= 1:
15
+ workers = multiprocessing.cpu_count() * 2 + 1
16
+
17
+ # Binding address
18
+ bind = f"{os.getenv('HOST', args.host)}:{os.getenv('PORT', args.port)}"
19
+
20
+ # Enable preload_app option
21
+ preload_app = True
22
+
23
+ # Use Uvicorn worker
24
+ worker_class = "uvicorn.workers.UvicornWorker"
25
+
26
+ # Other Gunicorn configurations
27
+ timeout = int(os.getenv('TIMEOUT', 120))
28
+ keepalive = 5
29
+
30
+ # Optional SSL configuration
31
+ if args.ssl:
32
+ certfile = args.ssl_certfile
33
+ keyfile = args.ssl_keyfile
34
+
35
+ # Logging configuration
36
+ errorlog = os.getenv('ERROR_LOG', '-') # '-' means stderr
37
+ accesslog = os.getenv('ACCESS_LOG', '-') # '-' means stderr
38
+ loglevel = os.getenv('LOG_LEVEL', 'info')
39
+
40
+ def on_starting(server):
41
+ """
42
+ Executed when Gunicorn starts, before forking the first worker processes
43
+ You can use this function to do more initialization tasks for all processes
44
+ """
45
+ print("=" * 80)
46
+ print(f"GUNICORN MASTER PROCESS: on_starting jobs for all {workers} workers")
47
+ print(f"Process ID: {os.getpid()}")
48
+ print("=" * 80)
49
+
50
+ # Memory usage monitoring
51
+ try:
52
+ import psutil
53
+ process = psutil.Process(os.getpid())
54
+ memory_info = process.memory_info()
55
+ msg = f"Memory usage after initialization: {memory_info.rss / 1024 / 1024:.2f} MB"
56
+ print(msg)
57
+ except ImportError:
58
+ print("psutil not installed, skipping memory usage reporting")
59
+
60
+ print("=" * 80)
61
+ print("Gunicorn initialization complete, forking workers...")
62
+ print("=" * 80)
63
+
64
+
65
+ def on_exit(server):
66
+ """
67
+ Executed when Gunicorn is shutting down.
68
+ This is a good place to release shared resources.
69
+ """
70
+ print("=" * 80)
71
+ print("GUNICORN MASTER PROCESS: Shutting down")
72
+ print(f"Process ID: {os.getpid()}")
73
+ print("=" * 80)
74
+
75
+ # Release shared resources
76
+ finalize_share_data()
77
+
78
+ print("=" * 80)
79
+ print("Gunicorn shutdown complete")
80
+ print("=" * 80)
lightrag/api/lightrag_server.py CHANGED
@@ -483,17 +483,28 @@ def main():
483
 
484
  display_splash_screen(args)
485
 
 
 
 
 
 
 
 
486
  from lightrag.kg.shared_storage import initialize_share_data
487
- initialize_share_data(args.workers)
488
-
 
 
 
 
 
489
  uvicorn_config = {
490
- "app": "lightrag.api.lightrag_server:get_application",
491
- "factory": True,
492
  "host": args.host,
493
  "port": args.port,
494
- "workers": args.workers,
495
  "log_config": None, # Disable default config
496
  }
 
497
  if args.ssl:
498
  uvicorn_config.update(
499
  {
@@ -501,6 +512,8 @@ def main():
501
  "ssl_keyfile": args.ssl_keyfile,
502
  }
503
  )
 
 
504
  uvicorn.run(**uvicorn_config)
505
 
506
 
 
483
 
484
  display_splash_screen(args)
485
 
486
+ # Check if running under Gunicorn
487
+ if 'GUNICORN_CMD_ARGS' in os.environ:
488
+ # If started with Gunicorn, return directly as Gunicorn will call get_application
489
+ print("Running under Gunicorn - worker management handled by Gunicorn")
490
+ return
491
+
492
+ # If not running under Gunicorn, initialize shared data here
493
  from lightrag.kg.shared_storage import initialize_share_data
494
+ print("Starting in single-process mode")
495
+ initialize_share_data(1) # Force single process mode
496
+
497
+ # Create application instance directly instead of using factory function
498
+ app = create_app(args)
499
+
500
+ # Start Uvicorn in single process mode
501
  uvicorn_config = {
502
+ "app": app, # Pass application instance directly instead of string path
 
503
  "host": args.host,
504
  "port": args.port,
 
505
  "log_config": None, # Disable default config
506
  }
507
+
508
  if args.ssl:
509
  uvicorn_config.update(
510
  {
 
512
  "ssl_keyfile": args.ssl_keyfile,
513
  }
514
  )
515
+
516
+ print(f"Starting Uvicorn server in single-process mode on {args.host}:{args.port}")
517
  uvicorn.run(**uvicorn_config)
518
 
519
 
lightrag/kg/shared_storage.py CHANGED
@@ -1,10 +1,19 @@
1
  import os
 
2
  from multiprocessing.synchronize import Lock as ProcessLock
3
  from threading import Lock as ThreadLock
4
  from multiprocessing import Manager
5
  from typing import Any, Dict, Optional, Union
6
  from lightrag.utils import logger
7
 
 
 
 
 
 
 
 
 
8
  LockType = Union[ProcessLock, ThreadLock]
9
 
10
  _manager = None
@@ -21,41 +30,60 @@ _global_lock: Optional[LockType] = None
21
 
22
 
23
  def initialize_share_data(workers: int = 1):
24
- """Initialize storage data"""
25
- global _manager, _is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized
 
 
 
 
26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
  if _initialized and _initialized.value:
28
  is_multiprocess = _is_multiprocess.value
29
- if _is_multiprocess.value:
30
- logger.info(f"Process {os.getpid()} storage data already initialized!")
31
- return
32
-
33
  _manager = Manager()
34
  _initialized = _manager.Value("b", False)
35
  _is_multiprocess = _manager.Value("b", False)
36
 
37
- if workers == 1:
38
- _is_multiprocess.value = False
39
- _global_lock = ThreadLock()
40
- _shared_dicts = {}
41
- _share_objects = {}
42
- _init_flags = {}
43
- logger.info(f"Process {os.getpid()} storage data created for Single Process")
44
- else:
45
  _is_multiprocess.value = True
46
  _global_lock = _manager.Lock()
47
  # Create shared dictionaries with manager
48
  _shared_dicts = _manager.dict()
49
  _share_objects = _manager.dict()
50
- _init_flags = _manager.dict() # 使用共享字典存储初始化标志
51
- logger.info(f"Process {os.getpid()} storage data created for Multiple Process")
 
 
 
 
 
 
 
52
 
 
 
53
  is_multiprocess = _is_multiprocess.value
54
 
55
  def try_initialize_namespace(namespace: str) -> bool:
56
  """
57
- 尝试初始化命名空间。返回True表示当前进程获得了初始化权限。
58
- 使用共享字典的原子操作确保只有一个进程能成功初始化。
59
  """
60
  global _init_flags, _manager
61
 
@@ -126,3 +154,52 @@ def get_namespace_data(namespace: str) -> Dict[str, Any]:
126
  def get_scan_progress() -> Dict[str, Any]:
127
  """get storage space for document scanning progress data"""
128
  return get_namespace_data("scan_progress")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
+ import sys
3
  from multiprocessing.synchronize import Lock as ProcessLock
4
  from threading import Lock as ThreadLock
5
  from multiprocessing import Manager
6
  from typing import Any, Dict, Optional, Union
7
  from lightrag.utils import logger
8
 
9
+ # Define a direct print function for critical logs that must be visible in all processes
10
+ def direct_log(message, level="INFO"):
11
+ """
12
+ Log a message directly to stderr to ensure visibility in all processes,
13
+ including the Gunicorn master process.
14
+ """
15
+ print(f"{level}: {message}", file=sys.stderr, flush=True)
16
+
17
  LockType = Union[ProcessLock, ThreadLock]
18
 
19
  _manager = None
 
30
 
31
 
32
  def initialize_share_data(workers: int = 1):
33
+ """
34
+ Initialize shared storage data for single or multi-process mode.
35
+
36
+ When used with Gunicorn's preload feature, this function is called once in the
37
+ master process before forking worker processes, allowing all workers to share
38
+ the same initialized data.
39
 
40
+ In single-process mode, this function is called during LightRAG object initialization.
41
+
42
+ The function determines whether to use cross-process shared variables for data storage
43
+ based on the number of workers. If workers=1, it uses thread locks and local dictionaries.
44
+ If workers>1, it uses process locks and shared dictionaries managed by multiprocessing.Manager.
45
+
46
+ Args:
47
+ workers (int): Number of worker processes. If 1, single-process mode is used.
48
+ If > 1, multi-process mode with shared memory is used.
49
+ """
50
+ global _manager, _is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized
51
+
52
+ # Check if already initialized
53
  if _initialized and _initialized.value:
54
  is_multiprocess = _is_multiprocess.value
55
+ direct_log(f"Process {os.getpid()} storage data already initialized (multiprocess={_is_multiprocess.value})!")
56
+ return
57
+
 
58
  _manager = Manager()
59
  _initialized = _manager.Value("b", False)
60
  _is_multiprocess = _manager.Value("b", False)
61
 
62
+ # Force multi-process mode if workers > 1
63
+ if workers > 1:
 
 
 
 
 
 
64
  _is_multiprocess.value = True
65
  _global_lock = _manager.Lock()
66
  # Create shared dictionaries with manager
67
  _shared_dicts = _manager.dict()
68
  _share_objects = _manager.dict()
69
+ _init_flags = _manager.dict() # Use shared dictionary to store initialization flags
70
+ direct_log(f"Process {os.getpid()} storage data created for Multiple Process (workers={workers})")
71
+ else:
72
+ _is_multiprocess.value = False
73
+ _global_lock = ThreadLock()
74
+ _shared_dicts = {}
75
+ _share_objects = {}
76
+ _init_flags = {}
77
+ direct_log(f"Process {os.getpid()} storage data created for Single Process")
78
 
79
+ # Mark as initialized
80
+ _initialized.value = True
81
  is_multiprocess = _is_multiprocess.value
82
 
83
  def try_initialize_namespace(namespace: str) -> bool:
84
  """
85
+ Try to initialize a namespace. Returns True if the current process gets initialization permission.
86
+ Uses atomic operations on shared dictionaries to ensure only one process can successfully initialize.
87
  """
88
  global _init_flags, _manager
89
 
 
154
  def get_scan_progress() -> Dict[str, Any]:
155
  """get storage space for document scanning progress data"""
156
  return get_namespace_data("scan_progress")
157
+
158
+
159
+ def finalize_share_data():
160
+ """
161
+ Release shared resources and clean up.
162
+
163
+ This function should be called when the application is shutting down
164
+ to properly release shared resources and avoid memory leaks.
165
+
166
+ In multi-process mode, it shuts down the Manager and releases all shared objects.
167
+ In single-process mode, it simply resets the global variables.
168
+ """
169
+ global _manager, _is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized
170
+
171
+ # Check if already initialized
172
+ if not (_initialized and _initialized.value):
173
+ direct_log(f"Process {os.getpid()} storage data not initialized, nothing to finalize")
174
+ return
175
+
176
+ direct_log(f"Process {os.getpid()} finalizing storage data (multiprocess={_is_multiprocess.value})")
177
+
178
+ # In multi-process mode, shut down the Manager
179
+ if _is_multiprocess.value and _manager is not None:
180
+ try:
181
+ # Clear shared dictionaries first
182
+ if _shared_dicts is not None:
183
+ _shared_dicts.clear()
184
+ if _share_objects is not None:
185
+ _share_objects.clear()
186
+ if _init_flags is not None:
187
+ _init_flags.clear()
188
+
189
+ # Shut down the Manager
190
+ _manager.shutdown()
191
+ direct_log(f"Process {os.getpid()} Manager shutdown complete")
192
+ except Exception as e:
193
+ direct_log(f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR")
194
+
195
+ # Reset global variables
196
+ _manager = None
197
+ _initialized = None
198
+ _is_multiprocess = None
199
+ is_multiprocess = None
200
+ _shared_dicts = None
201
+ _share_objects = None
202
+ _init_flags = None
203
+ _global_lock = None
204
+
205
+ direct_log(f"Process {os.getpid()} storage data finalization complete")
run_with_gunicorn.py ADDED
@@ -0,0 +1,172 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python
2
+ """
3
+ Start LightRAG server with Gunicorn
4
+ """
5
+ import os
6
+ import sys
7
+ import json
8
+ import signal
9
+ import argparse
10
+ import subprocess
11
+ from lightrag.api.utils_api import parse_args, display_splash_screen
12
+ from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data
13
+
14
+ # Signal handler for graceful shutdown
15
+ def signal_handler(sig, frame):
16
+ print("\n\n" + "="*80)
17
+ print("RECEIVED TERMINATION SIGNAL")
18
+ print(f"Process ID: {os.getpid()}")
19
+ print("="*80 + "\n")
20
+
21
+ # Release shared resources
22
+ finalize_share_data()
23
+
24
+ # Exit with success status
25
+ sys.exit(0)
26
+
27
+ def main():
28
+ # Register signal handlers for graceful shutdown
29
+ signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
30
+ signal.signal(signal.SIGTERM, signal_handler) # kill command
31
+ # Create a parser to handle Gunicorn-specific parameters
32
+ parser = argparse.ArgumentParser(
33
+ description="Start LightRAG server with Gunicorn"
34
+ )
35
+ parser.add_argument(
36
+ "--workers",
37
+ type=int,
38
+ help="Number of worker processes (overrides the default or config.ini setting)"
39
+ )
40
+ parser.add_argument(
41
+ "--timeout",
42
+ type=int,
43
+ help="Worker timeout in seconds (default: 120)"
44
+ )
45
+ parser.add_argument(
46
+ "--log-level",
47
+ choices=["debug", "info", "warning", "error", "critical"],
48
+ help="Gunicorn log level"
49
+ )
50
+
51
+ # Parse Gunicorn-specific arguments
52
+ gunicorn_args, remaining_args = parser.parse_known_args()
53
+
54
+ # Pass remaining arguments to LightRAG's parse_args
55
+ sys.argv = [sys.argv[0]] + remaining_args
56
+ args = parse_args()
57
+
58
+ # If workers specified, override args value
59
+ if gunicorn_args.workers:
60
+ args.workers = gunicorn_args.workers
61
+ os.environ["WORKERS"] = str(gunicorn_args.workers)
62
+
63
+ # If timeout specified, set environment variable
64
+ if gunicorn_args.timeout:
65
+ os.environ["TIMEOUT"] = str(gunicorn_args.timeout)
66
+
67
+ # If log-level specified, set environment variable
68
+ if gunicorn_args.log_level:
69
+ os.environ["LOG_LEVEL"] = gunicorn_args.log_level
70
+
71
+ # Save all LightRAG args to environment variable for worker processes
72
+ # This is the key step for passing arguments to lightrag_server.py
73
+ os.environ["LIGHTRAG_ARGS"] = json.dumps(vars(args))
74
+
75
+ # Display startup information
76
+ display_splash_screen(args)
77
+
78
+ print("🚀 Starting LightRAG with Gunicorn")
79
+ print(f"🔄 Worker management: Gunicorn (workers={args.workers})")
80
+ print("🔍 Preloading app: Enabled")
81
+ print("📝 Note: Using Gunicorn's preload feature for shared data initialization")
82
+ print("\n\n" + "="*80)
83
+ print("MAIN PROCESS INITIALIZATION")
84
+ print(f"Process ID: {os.getpid()}")
85
+ print(f"Workers setting: {args.workers}")
86
+ print("="*80 + "\n")
87
+
88
+ # Start application with Gunicorn using direct Python API
89
+ # Ensure WORKERS environment variable is set before importing gunicorn_config
90
+ if args.workers > 1:
91
+ os.environ["WORKERS"] = str(args.workers)
92
+
93
+ # Import Gunicorn's StandaloneApplication
94
+ from gunicorn.app.base import BaseApplication
95
+
96
+ # Define a custom application class that loads our config
97
+ class GunicornApp(BaseApplication):
98
+ def __init__(self, app, options=None):
99
+ self.options = options or {}
100
+ self.application = app
101
+ super().__init__()
102
+
103
+ def load_config(self):
104
+ # Define valid Gunicorn configuration options
105
+ valid_options = {
106
+ 'bind', 'workers', 'worker_class', 'timeout', 'keepalive',
107
+ 'preload_app', 'errorlog', 'accesslog', 'loglevel',
108
+ 'certfile', 'keyfile', 'limit_request_line', 'limit_request_fields',
109
+ 'limit_request_field_size', 'graceful_timeout', 'max_requests',
110
+ 'max_requests_jitter'
111
+ }
112
+
113
+ # Special hooks that need to be set separately
114
+ special_hooks = {
115
+ 'on_starting', 'on_reload', 'on_exit', 'pre_fork', 'post_fork',
116
+ 'pre_exec', 'pre_request', 'post_request', 'worker_init',
117
+ 'worker_exit', 'nworkers_changed', 'child_exit'
118
+ }
119
+
120
+ # Import the gunicorn_config module directly
121
+ import importlib.util
122
+ spec = importlib.util.spec_from_file_location("gunicorn_config", "gunicorn_config.py")
123
+ self.config_module = importlib.util.module_from_spec(spec)
124
+ spec.loader.exec_module(self.config_module)
125
+
126
+ # Set configuration options
127
+ for key in dir(self.config_module):
128
+ if key in valid_options:
129
+ value = getattr(self.config_module, key)
130
+ # Skip functions like on_starting
131
+ if not callable(value):
132
+ self.cfg.set(key, value)
133
+ # Set special hooks
134
+ elif key in special_hooks:
135
+ value = getattr(self.config_module, key)
136
+ if callable(value):
137
+ self.cfg.set(key, value)
138
+
139
+ # Override with command line arguments if provided
140
+ if gunicorn_args.workers:
141
+ self.cfg.set("workers", gunicorn_args.workers)
142
+ if gunicorn_args.timeout:
143
+ self.cfg.set("timeout", gunicorn_args.timeout)
144
+ if gunicorn_args.log_level:
145
+ self.cfg.set("loglevel", gunicorn_args.log_level)
146
+
147
+ def load(self):
148
+ # Import the application
149
+ from lightrag.api.lightrag_server import get_application
150
+ return get_application()
151
+
152
+ # Create the application
153
+ app = GunicornApp("")
154
+
155
+ # Directly call initialize_share_data with the correct workers value
156
+ from lightrag.kg.shared_storage import initialize_share_data
157
+
158
+ # Force workers to be an integer and greater than 1 for multi-process mode
159
+ workers_count = int(args.workers)
160
+ if workers_count > 1:
161
+ # Set a flag to indicate we're in the main process
162
+ os.environ["LIGHTRAG_MAIN_PROCESS"] = "1"
163
+ initialize_share_data(workers_count)
164
+ else:
165
+ initialize_share_data(1)
166
+
167
+ # Run the application
168
+ print("\nStarting Gunicorn with direct Python API...")
169
+ app.run()
170
+
171
+ if __name__ == "__main__":
172
+ main()