yangdx commited on
Commit
8d8d813
Β·
1 Parent(s): b3894b8

Fix linting

Browse files
gunicorn_config.py CHANGED
@@ -8,7 +8,7 @@ from lightrag.api.utils_api import parse_args
8
  args = parse_args()
9
 
10
  # Determine worker count - from environment variable or command line arguments
11
- workers = int(os.getenv('WORKERS', args.workers))
12
 
13
  # If not specified, use CPU count * 2 + 1 (Gunicorn recommended configuration)
14
  if workers <= 1:
@@ -24,7 +24,7 @@ preload_app = True
24
  worker_class = "uvicorn.workers.UvicornWorker"
25
 
26
  # Other Gunicorn configurations
27
- timeout = int(os.getenv('TIMEOUT', 120))
28
  keepalive = 5
29
 
30
  # Optional SSL configuration
@@ -33,9 +33,10 @@ if args.ssl:
33
  keyfile = args.ssl_keyfile
34
 
35
  # Logging configuration
36
- errorlog = os.getenv('ERROR_LOG', '-') # '-' means stderr
37
- accesslog = os.getenv('ACCESS_LOG', '-') # '-' means stderr
38
- loglevel = os.getenv('LOG_LEVEL', 'info')
 
39
 
40
  def on_starting(server):
41
  """
@@ -46,21 +47,25 @@ def on_starting(server):
46
  print(f"GUNICORN MASTER PROCESS: on_starting jobs for all {workers} workers")
47
  print(f"Process ID: {os.getpid()}")
48
  print("=" * 80)
49
-
50
  # Memory usage monitoring
51
  try:
52
  import psutil
 
53
  process = psutil.Process(os.getpid())
54
  memory_info = process.memory_info()
55
- msg = f"Memory usage after initialization: {memory_info.rss / 1024 / 1024:.2f} MB"
 
 
56
  print(msg)
57
  except ImportError:
58
  print("psutil not installed, skipping memory usage reporting")
59
-
60
  print("=" * 80)
61
  print("Gunicorn initialization complete, forking workers...")
62
  print("=" * 80)
63
 
 
64
  def on_exit(server):
65
  """
66
  Executed when Gunicorn is shutting down.
@@ -70,10 +75,10 @@ def on_exit(server):
70
  print("GUNICORN MASTER PROCESS: Shutting down")
71
  print(f"Process ID: {os.getpid()}")
72
  print("=" * 80)
73
-
74
  # Release shared resources
75
  finalize_share_data()
76
-
77
  print("=" * 80)
78
  print("Gunicorn shutdown complete")
79
  print("=" * 80)
 
8
  args = parse_args()
9
 
10
  # Determine worker count - from environment variable or command line arguments
11
+ workers = int(os.getenv("WORKERS", args.workers))
12
 
13
  # If not specified, use CPU count * 2 + 1 (Gunicorn recommended configuration)
14
  if workers <= 1:
 
24
  worker_class = "uvicorn.workers.UvicornWorker"
25
 
26
  # Other Gunicorn configurations
27
+ timeout = int(os.getenv("TIMEOUT", 120))
28
  keepalive = 5
29
 
30
  # Optional SSL configuration
 
33
  keyfile = args.ssl_keyfile
34
 
35
  # Logging configuration
36
+ errorlog = os.getenv("ERROR_LOG", "-") # '-' means stderr
37
+ accesslog = os.getenv("ACCESS_LOG", "-") # '-' means stderr
38
+ loglevel = os.getenv("LOG_LEVEL", "info")
39
+
40
 
41
  def on_starting(server):
42
  """
 
47
  print(f"GUNICORN MASTER PROCESS: on_starting jobs for all {workers} workers")
48
  print(f"Process ID: {os.getpid()}")
49
  print("=" * 80)
50
+
51
  # Memory usage monitoring
52
  try:
53
  import psutil
54
+
55
  process = psutil.Process(os.getpid())
56
  memory_info = process.memory_info()
57
+ msg = (
58
+ f"Memory usage after initialization: {memory_info.rss / 1024 / 1024:.2f} MB"
59
+ )
60
  print(msg)
61
  except ImportError:
62
  print("psutil not installed, skipping memory usage reporting")
63
+
64
  print("=" * 80)
65
  print("Gunicorn initialization complete, forking workers...")
66
  print("=" * 80)
67
 
68
+
69
  def on_exit(server):
70
  """
71
  Executed when Gunicorn is shutting down.
 
75
  print("GUNICORN MASTER PROCESS: Shutting down")
76
  print(f"Process ID: {os.getpid()}")
77
  print("=" * 80)
78
+
79
  # Release shared resources
80
  finalize_share_data()
81
+
82
  print("=" * 80)
83
  print("Gunicorn shutdown complete")
84
  print("=" * 80)
lightrag/api/lightrag_server.py CHANGED
@@ -471,12 +471,13 @@ def configure_logging():
471
 
472
  def main():
473
  # Check if running under Gunicorn
474
- if 'GUNICORN_CMD_ARGS' in os.environ:
475
  # If started with Gunicorn, return directly as Gunicorn will call get_application
476
  print("Running under Gunicorn - worker management handled by Gunicorn")
477
  return
478
 
479
  from multiprocessing import freeze_support
 
480
  freeze_support()
481
 
482
  args = parse_args()
@@ -487,10 +488,10 @@ def main():
487
  configure_logging()
488
 
489
  display_splash_screen(args)
490
-
491
  # Create application instance directly instead of using factory function
492
  app = create_app(args)
493
-
494
  # Start Uvicorn in single process mode
495
  uvicorn_config = {
496
  "app": app, # Pass application instance directly instead of string path
@@ -498,7 +499,7 @@ def main():
498
  "port": args.port,
499
  "log_config": None, # Disable default config
500
  }
501
-
502
  if args.ssl:
503
  uvicorn_config.update(
504
  {
@@ -506,7 +507,7 @@ def main():
506
  "ssl_keyfile": args.ssl_keyfile,
507
  }
508
  )
509
-
510
  print(f"Starting Uvicorn server in single-process mode on {args.host}:{args.port}")
511
  uvicorn.run(**uvicorn_config)
512
 
 
471
 
472
  def main():
473
  # Check if running under Gunicorn
474
+ if "GUNICORN_CMD_ARGS" in os.environ:
475
  # If started with Gunicorn, return directly as Gunicorn will call get_application
476
  print("Running under Gunicorn - worker management handled by Gunicorn")
477
  return
478
 
479
  from multiprocessing import freeze_support
480
+
481
  freeze_support()
482
 
483
  args = parse_args()
 
488
  configure_logging()
489
 
490
  display_splash_screen(args)
491
+
492
  # Create application instance directly instead of using factory function
493
  app = create_app(args)
494
+
495
  # Start Uvicorn in single process mode
496
  uvicorn_config = {
497
  "app": app, # Pass application instance directly instead of string path
 
499
  "port": args.port,
500
  "log_config": None, # Disable default config
501
  }
502
+
503
  if args.ssl:
504
  uvicorn_config.update(
505
  {
 
507
  "ssl_keyfile": args.ssl_keyfile,
508
  }
509
  )
510
+
511
  print(f"Starting Uvicorn server in single-process mode on {args.host}:{args.port}")
512
  uvicorn.run(**uvicorn_config)
513
 
lightrag/kg/json_kv_impl.py CHANGED
@@ -10,7 +10,11 @@ from lightrag.utils import (
10
  logger,
11
  write_json,
12
  )
13
- from .shared_storage import get_namespace_data, get_storage_lock, try_initialize_namespace
 
 
 
 
14
 
15
 
16
  @final
@@ -20,7 +24,7 @@ class JsonKVStorage(BaseKVStorage):
20
  working_dir = self.global_config["working_dir"]
21
  self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
22
  self._storage_lock = get_storage_lock()
23
-
24
  # check need_init must before get_namespace_data
25
  need_init = try_initialize_namespace(self.namespace)
26
  self._data = get_namespace_data(self.namespace)
 
10
  logger,
11
  write_json,
12
  )
13
+ from .shared_storage import (
14
+ get_namespace_data,
15
+ get_storage_lock,
16
+ try_initialize_namespace,
17
+ )
18
 
19
 
20
  @final
 
24
  working_dir = self.global_config["working_dir"]
25
  self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
26
  self._storage_lock = get_storage_lock()
27
+
28
  # check need_init must before get_namespace_data
29
  need_init = try_initialize_namespace(self.namespace)
30
  self._data = get_namespace_data(self.namespace)
lightrag/kg/nano_vector_db_impl.py CHANGED
@@ -11,7 +11,12 @@ from lightrag.utils import (
11
  )
12
  import pipmaster as pm
13
  from lightrag.base import BaseVectorStorage
14
- from .shared_storage import get_storage_lock, get_namespace_object, is_multiprocess, try_initialize_namespace
 
 
 
 
 
15
 
16
  if not pm.is_installed("nano-vectordb"):
17
  pm.install("nano-vectordb")
 
11
  )
12
  import pipmaster as pm
13
  from lightrag.base import BaseVectorStorage
14
+ from .shared_storage import (
15
+ get_storage_lock,
16
+ get_namespace_object,
17
+ is_multiprocess,
18
+ try_initialize_namespace,
19
+ )
20
 
21
  if not pm.is_installed("nano-vectordb"):
22
  pm.install("nano-vectordb")
lightrag/kg/networkx_impl.py CHANGED
@@ -6,7 +6,12 @@ import numpy as np
6
  from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
7
  from lightrag.utils import logger
8
  from lightrag.base import BaseGraphStorage
9
- from .shared_storage import get_storage_lock, get_namespace_object, is_multiprocess, try_initialize_namespace
 
 
 
 
 
10
 
11
  import pipmaster as pm
12
 
@@ -74,16 +79,14 @@ class NetworkXStorage(BaseGraphStorage):
74
  self.global_config["working_dir"], f"graph_{self.namespace}.graphml"
75
  )
76
  self._storage_lock = get_storage_lock()
77
-
78
  # check need_init must before get_namespace_object
79
  need_init = try_initialize_namespace(self.namespace)
80
  self._graph = get_namespace_object(self.namespace)
81
-
82
  if need_init:
83
  if is_multiprocess:
84
- preloaded_graph = NetworkXStorage.load_nx_graph(
85
- self._graphml_xml_file
86
- )
87
  self._graph.value = preloaded_graph or nx.Graph()
88
  if preloaded_graph:
89
  logger.info(
@@ -92,9 +95,7 @@ class NetworkXStorage(BaseGraphStorage):
92
  else:
93
  logger.info("Created new empty graph")
94
  else:
95
- preloaded_graph = NetworkXStorage.load_nx_graph(
96
- self._graphml_xml_file
97
- )
98
  self._graph = preloaded_graph or nx.Graph()
99
  if preloaded_graph:
100
  logger.info(
 
6
  from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
7
  from lightrag.utils import logger
8
  from lightrag.base import BaseGraphStorage
9
+ from .shared_storage import (
10
+ get_storage_lock,
11
+ get_namespace_object,
12
+ is_multiprocess,
13
+ try_initialize_namespace,
14
+ )
15
 
16
  import pipmaster as pm
17
 
 
79
  self.global_config["working_dir"], f"graph_{self.namespace}.graphml"
80
  )
81
  self._storage_lock = get_storage_lock()
82
+
83
  # check need_init must before get_namespace_object
84
  need_init = try_initialize_namespace(self.namespace)
85
  self._graph = get_namespace_object(self.namespace)
86
+
87
  if need_init:
88
  if is_multiprocess:
89
+ preloaded_graph = NetworkXStorage.load_nx_graph(self._graphml_xml_file)
 
 
90
  self._graph.value = preloaded_graph or nx.Graph()
91
  if preloaded_graph:
92
  logger.info(
 
95
  else:
96
  logger.info("Created new empty graph")
97
  else:
98
+ preloaded_graph = NetworkXStorage.load_nx_graph(self._graphml_xml_file)
 
 
99
  self._graph = preloaded_graph or nx.Graph()
100
  if preloaded_graph:
101
  logger.info(
lightrag/kg/shared_storage.py CHANGED
@@ -4,16 +4,17 @@ from multiprocessing.synchronize import Lock as ProcessLock
4
  from threading import Lock as ThreadLock
5
  from multiprocessing import Manager
6
  from typing import Any, Dict, Optional, Union
7
- from lightrag.utils import logger
8
 
9
  # Define a direct print function for critical logs that must be visible in all processes
10
  def direct_log(message, level="INFO"):
11
  """
12
  Log a message directly to stderr to ensure visibility in all processes,
13
  including the Gunicorn master process.
14
- """
15
  print(f"{level}: {message}", file=sys.stderr, flush=True)
16
 
 
17
  LockType = Union[ProcessLock, ThreadLock]
18
 
19
  _manager = None
@@ -31,39 +32,53 @@ _global_lock: Optional[LockType] = None
31
  def initialize_share_data(workers: int = 1):
32
  """
33
  Initialize shared storage data for single or multi-process mode.
34
-
35
  When used with Gunicorn's preload feature, this function is called once in the
36
  master process before forking worker processes, allowing all workers to share
37
  the same initialized data.
38
-
39
  In single-process mode, this function is called during LightRAG object initialization.
40
-
41
  The function determines whether to use cross-process shared variables for data storage
42
  based on the number of workers. If workers=1, it uses thread locks and local dictionaries.
43
  If workers>1, it uses process locks and shared dictionaries managed by multiprocessing.Manager.
44
-
45
  Args:
46
  workers (int): Number of worker processes. If 1, single-process mode is used.
47
  If > 1, multi-process mode with shared memory is used.
48
  """
49
- global _manager, is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized
50
-
 
 
 
 
 
 
 
 
51
  # Check if already initialized
52
  if _initialized:
53
- direct_log(f"Process {os.getpid()} Shared-Data already initialized (multiprocess={is_multiprocess})")
 
 
54
  return
55
-
56
  _manager = Manager()
57
 
58
  # Force multi-process mode if workers > 1
59
  if workers > 1:
60
  is_multiprocess = True
61
- _global_lock = _manager.Lock()
62
  # Create shared dictionaries with manager
63
  _shared_dicts = _manager.dict()
64
  _share_objects = _manager.dict()
65
- _init_flags = _manager.dict() # Use shared dictionary to store initialization flags
66
- direct_log(f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})")
 
 
 
 
67
  else:
68
  is_multiprocess = False
69
  _global_lock = ThreadLock()
@@ -75,6 +90,7 @@ def initialize_share_data(workers: int = 1):
75
  # Mark as initialized
76
  _initialized = True
77
 
 
78
  def try_initialize_namespace(namespace: str) -> bool:
79
  """
80
  Try to initialize a namespace. Returns True if the current process gets initialization permission.
@@ -83,8 +99,11 @@ def try_initialize_namespace(namespace: str) -> bool:
83
  global _init_flags, _manager
84
 
85
  if _init_flags is None:
86
- direct_log(f"Error: try to create nanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR")
87
- raise ValueError("Shared dictionaries not initialized")
 
 
 
88
 
89
  if namespace not in _init_flags:
90
  _init_flags[namespace] = True
@@ -112,7 +131,10 @@ def get_namespace_object(namespace: str) -> Any:
112
  """Get an object for specific namespace"""
113
 
114
  if _share_objects is None:
115
- direct_log(f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR")
 
 
 
116
  raise ValueError("Shared dictionaries not initialized")
117
 
118
  lock = _get_global_lock()
@@ -123,14 +145,20 @@ def get_namespace_object(namespace: str) -> Any:
123
  _share_objects[namespace] = _manager.Value("O", None)
124
  else:
125
  _share_objects[namespace] = None
126
- direct_log(f"Created namespace({namespace}): type={type(_share_objects[namespace])}, pid={os.getpid()}")
 
 
127
 
128
  return _share_objects[namespace]
129
 
 
130
  def get_namespace_data(namespace: str) -> Dict[str, Any]:
131
  """get storage space for specific storage type(namespace)"""
132
  if _shared_dicts is None:
133
- direct_log(f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR")
 
 
 
134
  raise ValueError("Shared dictionaries not initialized")
135
 
136
  lock = _get_global_lock()
@@ -140,8 +168,10 @@ def get_namespace_data(namespace: str) -> Dict[str, Any]:
140
  _shared_dicts[namespace] = _manager.dict()
141
  else:
142
  _shared_dicts[namespace] = {}
143
- direct_log(f"Created namespace({namespace}): type={type(_shared_dicts[namespace])}, pid={os.getpid()}")
144
-
 
 
145
  return _shared_dicts[namespace]
146
 
147
 
@@ -153,22 +183,33 @@ def get_scan_progress() -> Dict[str, Any]:
153
  def finalize_share_data():
154
  """
155
  Release shared resources and clean up.
156
-
157
  This function should be called when the application is shutting down
158
  to properly release shared resources and avoid memory leaks.
159
-
160
  In multi-process mode, it shuts down the Manager and releases all shared objects.
161
  In single-process mode, it simply resets the global variables.
162
  """
163
- global _manager, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized
164
-
 
 
 
 
 
 
 
165
  # Check if already initialized
166
  if not _initialized:
167
- direct_log(f"Process {os.getpid()} storage data not initialized, nothing to finalize")
 
 
168
  return
169
-
170
- direct_log(f"Process {os.getpid()} finalizing storage data (multiprocess={is_multiprocess})")
171
-
 
 
172
  # In multi-process mode, shut down the Manager
173
  if is_multiprocess and _manager is not None:
174
  try:
@@ -179,13 +220,15 @@ def finalize_share_data():
179
  _share_objects.clear()
180
  if _init_flags is not None:
181
  _init_flags.clear()
182
-
183
  # Shut down the Manager
184
  _manager.shutdown()
185
  direct_log(f"Process {os.getpid()} Manager shutdown complete")
186
  except Exception as e:
187
- direct_log(f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR")
188
-
 
 
189
  # Reset global variables
190
  _manager = None
191
  _initialized = None
@@ -194,5 +237,5 @@ def finalize_share_data():
194
  _share_objects = None
195
  _init_flags = None
196
  _global_lock = None
197
-
198
  direct_log(f"Process {os.getpid()} storage data finalization complete")
 
4
  from threading import Lock as ThreadLock
5
  from multiprocessing import Manager
6
  from typing import Any, Dict, Optional, Union
7
+
8
 
9
  # Define a direct print function for critical logs that must be visible in all processes
10
  def direct_log(message, level="INFO"):
11
  """
12
  Log a message directly to stderr to ensure visibility in all processes,
13
  including the Gunicorn master process.
14
+ """
15
  print(f"{level}: {message}", file=sys.stderr, flush=True)
16
 
17
+
18
  LockType = Union[ProcessLock, ThreadLock]
19
 
20
  _manager = None
 
32
  def initialize_share_data(workers: int = 1):
33
  """
34
  Initialize shared storage data for single or multi-process mode.
35
+
36
  When used with Gunicorn's preload feature, this function is called once in the
37
  master process before forking worker processes, allowing all workers to share
38
  the same initialized data.
39
+
40
  In single-process mode, this function is called during LightRAG object initialization.
41
+
42
  The function determines whether to use cross-process shared variables for data storage
43
  based on the number of workers. If workers=1, it uses thread locks and local dictionaries.
44
  If workers>1, it uses process locks and shared dictionaries managed by multiprocessing.Manager.
45
+
46
  Args:
47
  workers (int): Number of worker processes. If 1, single-process mode is used.
48
  If > 1, multi-process mode with shared memory is used.
49
  """
50
+ global \
51
+ _manager, \
52
+ is_multiprocess, \
53
+ is_multiprocess, \
54
+ _global_lock, \
55
+ _shared_dicts, \
56
+ _share_objects, \
57
+ _init_flags, \
58
+ _initialized
59
+
60
  # Check if already initialized
61
  if _initialized:
62
+ direct_log(
63
+ f"Process {os.getpid()} Shared-Data already initialized (multiprocess={is_multiprocess})"
64
+ )
65
  return
66
+
67
  _manager = Manager()
68
 
69
  # Force multi-process mode if workers > 1
70
  if workers > 1:
71
  is_multiprocess = True
72
+ _global_lock = _manager.Lock()
73
  # Create shared dictionaries with manager
74
  _shared_dicts = _manager.dict()
75
  _share_objects = _manager.dict()
76
+ _init_flags = (
77
+ _manager.dict()
78
+ ) # Use shared dictionary to store initialization flags
79
+ direct_log(
80
+ f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
81
+ )
82
  else:
83
  is_multiprocess = False
84
  _global_lock = ThreadLock()
 
90
  # Mark as initialized
91
  _initialized = True
92
 
93
+
94
  def try_initialize_namespace(namespace: str) -> bool:
95
  """
96
  Try to initialize a namespace. Returns True if the current process gets initialization permission.
 
99
  global _init_flags, _manager
100
 
101
  if _init_flags is None:
102
+ direct_log(
103
+ f"Error: try to create nanmespace before Shared-Data is initialized, pid={os.getpid()}",
104
+ level="ERROR",
105
+ )
106
+ raise ValueError("Shared dictionaries not initialized")
107
 
108
  if namespace not in _init_flags:
109
  _init_flags[namespace] = True
 
131
  """Get an object for specific namespace"""
132
 
133
  if _share_objects is None:
134
+ direct_log(
135
+ f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}",
136
+ level="ERROR",
137
+ )
138
  raise ValueError("Shared dictionaries not initialized")
139
 
140
  lock = _get_global_lock()
 
145
  _share_objects[namespace] = _manager.Value("O", None)
146
  else:
147
  _share_objects[namespace] = None
148
+ direct_log(
149
+ f"Created namespace({namespace}): type={type(_share_objects[namespace])}, pid={os.getpid()}"
150
+ )
151
 
152
  return _share_objects[namespace]
153
 
154
+
155
  def get_namespace_data(namespace: str) -> Dict[str, Any]:
156
  """get storage space for specific storage type(namespace)"""
157
  if _shared_dicts is None:
158
+ direct_log(
159
+ f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}",
160
+ level="ERROR",
161
+ )
162
  raise ValueError("Shared dictionaries not initialized")
163
 
164
  lock = _get_global_lock()
 
168
  _shared_dicts[namespace] = _manager.dict()
169
  else:
170
  _shared_dicts[namespace] = {}
171
+ direct_log(
172
+ f"Created namespace({namespace}): type={type(_shared_dicts[namespace])}, pid={os.getpid()}"
173
+ )
174
+
175
  return _shared_dicts[namespace]
176
 
177
 
 
183
  def finalize_share_data():
184
  """
185
  Release shared resources and clean up.
186
+
187
  This function should be called when the application is shutting down
188
  to properly release shared resources and avoid memory leaks.
189
+
190
  In multi-process mode, it shuts down the Manager and releases all shared objects.
191
  In single-process mode, it simply resets the global variables.
192
  """
193
+ global \
194
+ _manager, \
195
+ is_multiprocess, \
196
+ _global_lock, \
197
+ _shared_dicts, \
198
+ _share_objects, \
199
+ _init_flags, \
200
+ _initialized
201
+
202
  # Check if already initialized
203
  if not _initialized:
204
+ direct_log(
205
+ f"Process {os.getpid()} storage data not initialized, nothing to finalize"
206
+ )
207
  return
208
+
209
+ direct_log(
210
+ f"Process {os.getpid()} finalizing storage data (multiprocess={is_multiprocess})"
211
+ )
212
+
213
  # In multi-process mode, shut down the Manager
214
  if is_multiprocess and _manager is not None:
215
  try:
 
220
  _share_objects.clear()
221
  if _init_flags is not None:
222
  _init_flags.clear()
223
+
224
  # Shut down the Manager
225
  _manager.shutdown()
226
  direct_log(f"Process {os.getpid()} Manager shutdown complete")
227
  except Exception as e:
228
+ direct_log(
229
+ f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR"
230
+ )
231
+
232
  # Reset global variables
233
  _manager = None
234
  _initialized = None
 
237
  _share_objects = None
238
  _init_flags = None
239
  _global_lock = None
240
+
241
  direct_log(f"Process {os.getpid()} storage data finalization complete")
lightrag/lightrag.py CHANGED
@@ -271,12 +271,17 @@ class LightRAG:
271
  set_logger(self.log_file_path, self.log_level)
272
  logger.info(f"Logger initialized for working directory: {self.working_dir}")
273
 
274
- from lightrag.kg.shared_storage import initialize_share_data, try_initialize_namespace, get_namespace_data
 
 
 
 
 
275
  initialize_share_data()
276
 
277
- need_init = try_initialize_namespace("scan_progress")
278
  scan_progress = get_namespace_data("scan_progress")
279
- logger.info(f"scan_progress type after init: {type(scan_progress)}")
280
  scan_progress.update(
281
  {
282
  "is_scanning": False,
@@ -286,9 +291,6 @@ class LightRAG:
286
  "progress": 0,
287
  }
288
  )
289
- scan_progress = get_namespace_data("scan_progress")
290
- logger.info(f"scan_progress type after update: {type(scan_progress)}")
291
- logger.info(f"Scan_progres value after update: {scan_progress}")
292
 
293
  if not os.path.exists(self.working_dir):
294
  logger.info(f"Creating working directory {self.working_dir}")
 
271
  set_logger(self.log_file_path, self.log_level)
272
  logger.info(f"Logger initialized for working directory: {self.working_dir}")
273
 
274
+ from lightrag.kg.shared_storage import (
275
+ initialize_share_data,
276
+ try_initialize_namespace,
277
+ get_namespace_data,
278
+ )
279
+
280
  initialize_share_data()
281
 
282
+ need_init = try_initialize_namespace("scan_progress")
283
  scan_progress = get_namespace_data("scan_progress")
284
+ logger.info(f"scan_progress type after init: {type(scan_progress)}")
285
  scan_progress.update(
286
  {
287
  "is_scanning": False,
 
291
  "progress": 0,
292
  }
293
  )
 
 
 
294
 
295
  if not os.path.exists(self.working_dir):
296
  logger.info(f"Creating working directory {self.working_dir}")
run_with_gunicorn.py CHANGED
@@ -2,127 +2,149 @@
2
  """
3
  Start LightRAG server with Gunicorn
4
  """
 
5
  import os
6
  import sys
7
  import json
8
  import signal
9
  import argparse
10
- import subprocess
11
  from lightrag.api.utils_api import parse_args, display_splash_screen
12
  from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data
13
 
 
14
  # Signal handler for graceful shutdown
15
  def signal_handler(sig, frame):
16
- print("\n\n" + "="*80)
17
  print("RECEIVED TERMINATION SIGNAL")
18
  print(f"Process ID: {os.getpid()}")
19
- print("="*80 + "\n")
20
-
21
  # Release shared resources
22
  finalize_share_data()
23
-
24
  # Exit with success status
25
  sys.exit(0)
26
 
 
27
  def main():
28
  # Register signal handlers for graceful shutdown
29
  signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
30
- signal.signal(signal.SIGTERM, signal_handler) # kill command
31
  # Create a parser to handle Gunicorn-specific parameters
32
- parser = argparse.ArgumentParser(
33
- description="Start LightRAG server with Gunicorn"
34
- )
35
  parser.add_argument(
36
  "--workers",
37
  type=int,
38
- help="Number of worker processes (overrides the default or config.ini setting)"
39
  )
40
  parser.add_argument(
41
- "--timeout",
42
- type=int,
43
- help="Worker timeout in seconds (default: 120)"
44
  )
45
  parser.add_argument(
46
  "--log-level",
47
  choices=["debug", "info", "warning", "error", "critical"],
48
- help="Gunicorn log level"
49
  )
50
-
51
  # Parse Gunicorn-specific arguments
52
  gunicorn_args, remaining_args = parser.parse_known_args()
53
-
54
  # Pass remaining arguments to LightRAG's parse_args
55
  sys.argv = [sys.argv[0]] + remaining_args
56
  args = parse_args()
57
-
58
  # If workers specified, override args value
59
  if gunicorn_args.workers:
60
  args.workers = gunicorn_args.workers
61
  os.environ["WORKERS"] = str(gunicorn_args.workers)
62
-
63
  # If timeout specified, set environment variable
64
  if gunicorn_args.timeout:
65
  os.environ["TIMEOUT"] = str(gunicorn_args.timeout)
66
-
67
  # If log-level specified, set environment variable
68
  if gunicorn_args.log_level:
69
  os.environ["LOG_LEVEL"] = gunicorn_args.log_level
70
-
71
  # Save all LightRAG args to environment variable for worker processes
72
  # This is the key step for passing arguments to lightrag_server.py
73
  os.environ["LIGHTRAG_ARGS"] = json.dumps(vars(args))
74
-
75
  # Display startup information
76
  display_splash_screen(args)
77
-
78
  print("πŸš€ Starting LightRAG with Gunicorn")
79
  print(f"πŸ”„ Worker management: Gunicorn (workers={args.workers})")
80
  print("πŸ” Preloading app: Enabled")
81
  print("πŸ“ Note: Using Gunicorn's preload feature for shared data initialization")
82
- print("\n\n" + "="*80)
83
  print("MAIN PROCESS INITIALIZATION")
84
  print(f"Process ID: {os.getpid()}")
85
  print(f"Workers setting: {args.workers}")
86
- print("="*80 + "\n")
87
-
88
  # Start application with Gunicorn using direct Python API
89
  # Ensure WORKERS environment variable is set before importing gunicorn_config
90
  if args.workers > 1:
91
  os.environ["WORKERS"] = str(args.workers)
92
-
93
  # Import Gunicorn's StandaloneApplication
94
  from gunicorn.app.base import BaseApplication
95
-
96
  # Define a custom application class that loads our config
97
  class GunicornApp(BaseApplication):
98
  def __init__(self, app, options=None):
99
  self.options = options or {}
100
  self.application = app
101
  super().__init__()
102
-
103
  def load_config(self):
104
  # Define valid Gunicorn configuration options
105
  valid_options = {
106
- 'bind', 'workers', 'worker_class', 'timeout', 'keepalive',
107
- 'preload_app', 'errorlog', 'accesslog', 'loglevel',
108
- 'certfile', 'keyfile', 'limit_request_line', 'limit_request_fields',
109
- 'limit_request_field_size', 'graceful_timeout', 'max_requests',
110
- 'max_requests_jitter'
 
 
 
 
 
 
 
 
 
 
 
 
111
  }
112
-
113
  # Special hooks that need to be set separately
114
  special_hooks = {
115
- 'on_starting', 'on_reload', 'on_exit', 'pre_fork', 'post_fork',
116
- 'pre_exec', 'pre_request', 'post_request', 'worker_init',
117
- 'worker_exit', 'nworkers_changed', 'child_exit'
 
 
 
 
 
 
 
 
 
118
  }
119
-
120
  # Import the gunicorn_config module directly
121
  import importlib.util
122
- spec = importlib.util.spec_from_file_location("gunicorn_config", "gunicorn_config.py")
 
 
 
123
  self.config_module = importlib.util.module_from_spec(spec)
124
  spec.loader.exec_module(self.config_module)
125
-
126
  # Set configuration options
127
  for key in dir(self.config_module):
128
  if key in valid_options:
@@ -135,7 +157,7 @@ def main():
135
  value = getattr(self.config_module, key)
136
  if callable(value):
137
  self.cfg.set(key, value)
138
-
139
  # Override with command line arguments if provided
140
  if gunicorn_args.workers:
141
  self.cfg.set("workers", gunicorn_args.workers)
@@ -143,18 +165,18 @@ def main():
143
  self.cfg.set("timeout", gunicorn_args.timeout)
144
  if gunicorn_args.log_level:
145
  self.cfg.set("loglevel", gunicorn_args.log_level)
146
-
147
  def load(self):
148
  # Import the application
149
  from lightrag.api.lightrag_server import get_application
 
150
  return get_application()
151
-
152
  # Create the application
153
  app = GunicornApp("")
154
-
155
  # Directly call initialize_share_data with the correct workers value
156
- from lightrag.kg.shared_storage import initialize_share_data
157
-
158
  # Force workers to be an integer and greater than 1 for multi-process mode
159
  workers_count = int(args.workers)
160
  if workers_count > 1:
@@ -163,10 +185,11 @@ def main():
163
  initialize_share_data(workers_count)
164
  else:
165
  initialize_share_data(1)
166
-
167
  # Run the application
168
  print("\nStarting Gunicorn with direct Python API...")
169
  app.run()
170
 
 
171
  if __name__ == "__main__":
172
  main()
 
2
  """
3
  Start LightRAG server with Gunicorn
4
  """
5
+
6
  import os
7
  import sys
8
  import json
9
  import signal
10
  import argparse
 
11
  from lightrag.api.utils_api import parse_args, display_splash_screen
12
  from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data
13
 
14
+
15
  # Signal handler for graceful shutdown
16
  def signal_handler(sig, frame):
17
+ print("\n\n" + "=" * 80)
18
  print("RECEIVED TERMINATION SIGNAL")
19
  print(f"Process ID: {os.getpid()}")
20
+ print("=" * 80 + "\n")
21
+
22
  # Release shared resources
23
  finalize_share_data()
24
+
25
  # Exit with success status
26
  sys.exit(0)
27
 
28
+
29
  def main():
30
  # Register signal handlers for graceful shutdown
31
  signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
32
+ signal.signal(signal.SIGTERM, signal_handler) # kill command
33
  # Create a parser to handle Gunicorn-specific parameters
34
+ parser = argparse.ArgumentParser(description="Start LightRAG server with Gunicorn")
 
 
35
  parser.add_argument(
36
  "--workers",
37
  type=int,
38
+ help="Number of worker processes (overrides the default or config.ini setting)",
39
  )
40
  parser.add_argument(
41
+ "--timeout", type=int, help="Worker timeout in seconds (default: 120)"
 
 
42
  )
43
  parser.add_argument(
44
  "--log-level",
45
  choices=["debug", "info", "warning", "error", "critical"],
46
+ help="Gunicorn log level",
47
  )
48
+
49
  # Parse Gunicorn-specific arguments
50
  gunicorn_args, remaining_args = parser.parse_known_args()
51
+
52
  # Pass remaining arguments to LightRAG's parse_args
53
  sys.argv = [sys.argv[0]] + remaining_args
54
  args = parse_args()
55
+
56
  # If workers specified, override args value
57
  if gunicorn_args.workers:
58
  args.workers = gunicorn_args.workers
59
  os.environ["WORKERS"] = str(gunicorn_args.workers)
60
+
61
  # If timeout specified, set environment variable
62
  if gunicorn_args.timeout:
63
  os.environ["TIMEOUT"] = str(gunicorn_args.timeout)
64
+
65
  # If log-level specified, set environment variable
66
  if gunicorn_args.log_level:
67
  os.environ["LOG_LEVEL"] = gunicorn_args.log_level
68
+
69
  # Save all LightRAG args to environment variable for worker processes
70
  # This is the key step for passing arguments to lightrag_server.py
71
  os.environ["LIGHTRAG_ARGS"] = json.dumps(vars(args))
72
+
73
  # Display startup information
74
  display_splash_screen(args)
75
+
76
  print("πŸš€ Starting LightRAG with Gunicorn")
77
  print(f"πŸ”„ Worker management: Gunicorn (workers={args.workers})")
78
  print("πŸ” Preloading app: Enabled")
79
  print("πŸ“ Note: Using Gunicorn's preload feature for shared data initialization")
80
+ print("\n\n" + "=" * 80)
81
  print("MAIN PROCESS INITIALIZATION")
82
  print(f"Process ID: {os.getpid()}")
83
  print(f"Workers setting: {args.workers}")
84
+ print("=" * 80 + "\n")
85
+
86
  # Start application with Gunicorn using direct Python API
87
  # Ensure WORKERS environment variable is set before importing gunicorn_config
88
  if args.workers > 1:
89
  os.environ["WORKERS"] = str(args.workers)
90
+
91
  # Import Gunicorn's StandaloneApplication
92
  from gunicorn.app.base import BaseApplication
93
+
94
  # Define a custom application class that loads our config
95
  class GunicornApp(BaseApplication):
96
  def __init__(self, app, options=None):
97
  self.options = options or {}
98
  self.application = app
99
  super().__init__()
100
+
101
  def load_config(self):
102
  # Define valid Gunicorn configuration options
103
  valid_options = {
104
+ "bind",
105
+ "workers",
106
+ "worker_class",
107
+ "timeout",
108
+ "keepalive",
109
+ "preload_app",
110
+ "errorlog",
111
+ "accesslog",
112
+ "loglevel",
113
+ "certfile",
114
+ "keyfile",
115
+ "limit_request_line",
116
+ "limit_request_fields",
117
+ "limit_request_field_size",
118
+ "graceful_timeout",
119
+ "max_requests",
120
+ "max_requests_jitter",
121
  }
122
+
123
  # Special hooks that need to be set separately
124
  special_hooks = {
125
+ "on_starting",
126
+ "on_reload",
127
+ "on_exit",
128
+ "pre_fork",
129
+ "post_fork",
130
+ "pre_exec",
131
+ "pre_request",
132
+ "post_request",
133
+ "worker_init",
134
+ "worker_exit",
135
+ "nworkers_changed",
136
+ "child_exit",
137
  }
138
+
139
  # Import the gunicorn_config module directly
140
  import importlib.util
141
+
142
+ spec = importlib.util.spec_from_file_location(
143
+ "gunicorn_config", "gunicorn_config.py"
144
+ )
145
  self.config_module = importlib.util.module_from_spec(spec)
146
  spec.loader.exec_module(self.config_module)
147
+
148
  # Set configuration options
149
  for key in dir(self.config_module):
150
  if key in valid_options:
 
157
  value = getattr(self.config_module, key)
158
  if callable(value):
159
  self.cfg.set(key, value)
160
+
161
  # Override with command line arguments if provided
162
  if gunicorn_args.workers:
163
  self.cfg.set("workers", gunicorn_args.workers)
 
165
  self.cfg.set("timeout", gunicorn_args.timeout)
166
  if gunicorn_args.log_level:
167
  self.cfg.set("loglevel", gunicorn_args.log_level)
168
+
169
  def load(self):
170
  # Import the application
171
  from lightrag.api.lightrag_server import get_application
172
+
173
  return get_application()
174
+
175
  # Create the application
176
  app = GunicornApp("")
177
+
178
  # Directly call initialize_share_data with the correct workers value
179
+
 
180
  # Force workers to be an integer and greater than 1 for multi-process mode
181
  workers_count = int(args.workers)
182
  if workers_count > 1:
 
185
  initialize_share_data(workers_count)
186
  else:
187
  initialize_share_data(1)
188
+
189
  # Run the application
190
  print("\nStarting Gunicorn with direct Python API...")
191
  app.run()
192
 
193
+
194
  if __name__ == "__main__":
195
  main()