yangdx commited on
Commit
b3894b8
·
1 Parent(s): 6c4bab1

Fix multiprocess dict creation logic, add process safety locks for namespace creation.

Browse files
gunicorn_config.py CHANGED
@@ -61,7 +61,6 @@ def on_starting(server):
61
  print("Gunicorn initialization complete, forking workers...")
62
  print("=" * 80)
63
 
64
-
65
  def on_exit(server):
66
  """
67
  Executed when Gunicorn is shutting down.
 
61
  print("Gunicorn initialization complete, forking workers...")
62
  print("=" * 80)
63
 
 
64
  def on_exit(server):
65
  """
66
  Executed when Gunicorn is shutting down.
lightrag/kg/shared_storage.py CHANGED
@@ -50,7 +50,7 @@ def initialize_share_data(workers: int = 1):
50
 
51
  # Check if already initialized
52
  if _initialized:
53
- direct_log(f"Process {os.getpid()} storage data already initialized (multiprocess={is_multiprocess})")
54
  return
55
 
56
  _manager = Manager()
@@ -63,14 +63,14 @@ def initialize_share_data(workers: int = 1):
63
  _shared_dicts = _manager.dict()
64
  _share_objects = _manager.dict()
65
  _init_flags = _manager.dict() # Use shared dictionary to store initialization flags
66
- direct_log(f"Process {os.getpid()} storage data created for Multiple Process (workers={workers})")
67
  else:
68
  is_multiprocess = False
69
  _global_lock = ThreadLock()
70
  _shared_dicts = {}
71
  _share_objects = {}
72
  _init_flags = {}
73
- direct_log(f"Process {os.getpid()} storage data created for Single Process")
74
 
75
  # Mark as initialized
76
  _initialized = True
@@ -82,28 +82,16 @@ def try_initialize_namespace(namespace: str) -> bool:
82
  """
83
  global _init_flags, _manager
84
 
85
- if is_multiprocess:
86
- if _init_flags is None:
87
- raise RuntimeError(
88
- "Shared storage not initialized. Call initialize_share_data() first."
89
- )
90
- else:
91
- if _init_flags is None:
92
- _init_flags = {}
93
-
94
- logger.info(f"Process {os.getpid()} trying to initialize namespace {namespace}")
95
 
96
- with _global_lock:
97
- if namespace not in _init_flags:
98
- _init_flags[namespace] = True
99
- logger.info(
100
- f"Process {os.getpid()} ready to initialize namespace {namespace}"
101
- )
102
- return True
103
- logger.info(
104
- f"Process {os.getpid()} found namespace {namespace} already initialized"
105
- )
106
- return False
107
 
108
 
109
  def _get_global_lock() -> LockType:
@@ -123,26 +111,37 @@ def get_scan_lock() -> LockType:
123
  def get_namespace_object(namespace: str) -> Any:
124
  """Get an object for specific namespace"""
125
 
126
- if namespace not in _share_objects:
127
- lock = _get_global_lock()
128
- with lock:
 
 
 
 
129
  if namespace not in _share_objects:
130
  if is_multiprocess:
131
  _share_objects[namespace] = _manager.Value("O", None)
132
  else:
133
  _share_objects[namespace] = None
 
134
 
135
  return _share_objects[namespace]
136
 
137
  def get_namespace_data(namespace: str) -> Dict[str, Any]:
138
  """get storage space for specific storage type(namespace)"""
139
-
140
- if namespace not in _shared_dicts:
141
- lock = _get_global_lock()
142
- with lock:
143
- if namespace not in _shared_dicts:
 
 
 
 
 
144
  _shared_dicts[namespace] = {}
145
-
 
146
  return _shared_dicts[namespace]
147
 
148
 
 
50
 
51
  # Check if already initialized
52
  if _initialized:
53
+ direct_log(f"Process {os.getpid()} Shared-Data already initialized (multiprocess={is_multiprocess})")
54
  return
55
 
56
  _manager = Manager()
 
63
  _shared_dicts = _manager.dict()
64
  _share_objects = _manager.dict()
65
  _init_flags = _manager.dict() # Use shared dictionary to store initialization flags
66
+ direct_log(f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})")
67
  else:
68
  is_multiprocess = False
69
  _global_lock = ThreadLock()
70
  _shared_dicts = {}
71
  _share_objects = {}
72
  _init_flags = {}
73
+ direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
74
 
75
  # Mark as initialized
76
  _initialized = True
 
82
  """
83
  global _init_flags, _manager
84
 
85
+ if _init_flags is None:
86
+ direct_log(f"Error: try to create nanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR")
87
+ raise ValueError("Shared dictionaries not initialized")
 
 
 
 
 
 
 
88
 
89
+ if namespace not in _init_flags:
90
+ _init_flags[namespace] = True
91
+ direct_log(f"Process {os.getpid()} ready to initialize namespace {namespace}")
92
+ return True
93
+ direct_log(f"Process {os.getpid()} namespace {namespace} already to initialized")
94
+ return False
 
 
 
 
 
95
 
96
 
97
  def _get_global_lock() -> LockType:
 
111
  def get_namespace_object(namespace: str) -> Any:
112
  """Get an object for specific namespace"""
113
 
114
+ if _share_objects is None:
115
+ direct_log(f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR")
116
+ raise ValueError("Shared dictionaries not initialized")
117
+
118
+ lock = _get_global_lock()
119
+ with lock:
120
+ if namespace not in _share_objects:
121
  if namespace not in _share_objects:
122
  if is_multiprocess:
123
  _share_objects[namespace] = _manager.Value("O", None)
124
  else:
125
  _share_objects[namespace] = None
126
+ direct_log(f"Created namespace({namespace}): type={type(_share_objects[namespace])}, pid={os.getpid()}")
127
 
128
  return _share_objects[namespace]
129
 
130
  def get_namespace_data(namespace: str) -> Dict[str, Any]:
131
  """get storage space for specific storage type(namespace)"""
132
+ if _shared_dicts is None:
133
+ direct_log(f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR")
134
+ raise ValueError("Shared dictionaries not initialized")
135
+
136
+ lock = _get_global_lock()
137
+ with lock:
138
+ if namespace not in _shared_dicts:
139
+ if is_multiprocess and _manager is not None:
140
+ _shared_dicts[namespace] = _manager.dict()
141
+ else:
142
  _shared_dicts[namespace] = {}
143
+ direct_log(f"Created namespace({namespace}): type={type(_shared_dicts[namespace])}, pid={os.getpid()}")
144
+
145
  return _shared_dicts[namespace]
146
 
147
 
lightrag/lightrag.py CHANGED
@@ -267,25 +267,29 @@ class LightRAG:
267
  _storages_status: StoragesStatus = field(default=StoragesStatus.NOT_CREATED)
268
 
269
  def __post_init__(self):
270
- from lightrag.kg.shared_storage import initialize_share_data, try_initialize_namespace, get_namespace_data
271
- initialize_share_data()
272
- need_init = try_initialize_namespace("scan_progress")
273
- scan_progress = get_namespace_data("scan_progress")
274
- if need_init:
275
- scan_progress.update(
276
- {
277
- "is_scanning": False,
278
- "current_file": "",
279
- "indexed_count": 0,
280
- "total_files": 0,
281
- "progress": 0,
282
- }
283
- )
284
-
285
  os.makedirs(os.path.dirname(self.log_file_path), exist_ok=True)
286
  set_logger(self.log_file_path, self.log_level)
287
  logger.info(f"Logger initialized for working directory: {self.working_dir}")
288
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289
  if not os.path.exists(self.working_dir):
290
  logger.info(f"Creating working directory {self.working_dir}")
291
  os.makedirs(self.working_dir)
 
267
  _storages_status: StoragesStatus = field(default=StoragesStatus.NOT_CREATED)
268
 
269
  def __post_init__(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
270
  os.makedirs(os.path.dirname(self.log_file_path), exist_ok=True)
271
  set_logger(self.log_file_path, self.log_level)
272
  logger.info(f"Logger initialized for working directory: {self.working_dir}")
273
 
274
+ from lightrag.kg.shared_storage import initialize_share_data, try_initialize_namespace, get_namespace_data
275
+ initialize_share_data()
276
+
277
+ need_init = try_initialize_namespace("scan_progress")
278
+ scan_progress = get_namespace_data("scan_progress")
279
+ logger.info(f"scan_progress type after init: {type(scan_progress)}")
280
+ scan_progress.update(
281
+ {
282
+ "is_scanning": False,
283
+ "current_file": "",
284
+ "indexed_count": 0,
285
+ "total_files": 0,
286
+ "progress": 0,
287
+ }
288
+ )
289
+ scan_progress = get_namespace_data("scan_progress")
290
+ logger.info(f"scan_progress type after update: {type(scan_progress)}")
291
+ logger.info(f"Scan_progres value after update: {scan_progress}")
292
+
293
  if not os.path.exists(self.working_dir):
294
  logger.info(f"Creating working directory {self.working_dir}")
295
  os.makedirs(self.working_dir)