yangdx commited on
Commit
2655164
·
1 Parent(s): b70a510

Refactor shared storage locks to separate pipeline, storage and internal locks for deadlock preventing

Browse files
lightrag/api/lightrag_server.py CHANGED
@@ -138,17 +138,17 @@ def create_app(args):
138
  # Import necessary functions from shared_storage
139
  from lightrag.kg.shared_storage import (
140
  get_namespace_data,
141
- get_storage_lock,
142
- initialize_pipeline_namespace,
143
  )
144
- await initialize_pipeline_namespace()
145
 
146
  # Auto scan documents if enabled
147
  if args.auto_scan_at_startup:
148
  # Check if a task is already running (with lock protection)
149
  pipeline_status = await get_namespace_data("pipeline_status")
150
  should_start_task = False
151
- async with get_storage_lock():
152
  if not pipeline_status.get("busy", False):
153
  should_start_task = True
154
  # Only start the task if no other task is running
 
138
  # Import necessary functions from shared_storage
139
  from lightrag.kg.shared_storage import (
140
  get_namespace_data,
141
+ get_pipeline_status_lock,
142
+ initialize_pipeline_status,
143
  )
144
+ await initialize_pipeline_status()
145
 
146
  # Auto scan documents if enabled
147
  if args.auto_scan_at_startup:
148
  # Check if a task is already running (with lock protection)
149
  pipeline_status = await get_namespace_data("pipeline_status")
150
  should_start_task = False
151
+ async with get_pipeline_status_lock():
152
  if not pipeline_status.get("busy", False):
153
  should_start_task = True
154
  # Only start the task if no other task is running
lightrag/kg/shared_storage.py CHANGED
@@ -16,6 +16,22 @@ def direct_log(message, level="INFO"):
16
 
17
 
18
  T = TypeVar('T')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
  class UnifiedLock(Generic[T]):
21
  """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
@@ -39,30 +55,37 @@ class UnifiedLock(Generic[T]):
39
  def __enter__(self) -> 'UnifiedLock[T]':
40
  """For backward compatibility"""
41
  if self._is_async:
42
- raise RuntimeError("Use 'async with' for asyncio.Lock")
43
  self._lock.acquire()
44
  return self
45
 
46
  def __exit__(self, exc_type, exc_val, exc_tb):
47
  """For backward compatibility"""
48
  if self._is_async:
49
- raise RuntimeError("Use 'async with' for asyncio.Lock")
50
  self._lock.release()
51
 
52
 
53
- LockType = Union[ProcessLock, asyncio.Lock]
54
-
55
- is_multiprocess = None
56
- _workers = None
57
- _manager = None
58
- _initialized = None
59
- _global_lock: Optional[LockType] = None
60
 
61
- # shared data for storage across processes
62
- _shared_dicts: Optional[Dict[str, Any]] = None
63
- _init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
64
- _update_flags: Optional[Dict[str, bool]] = None # namespace -> updated
 
 
65
 
 
 
 
 
 
 
66
 
67
  def initialize_share_data(workers: int = 1):
68
  """
@@ -87,7 +110,9 @@ def initialize_share_data(workers: int = 1):
87
  _workers, \
88
  is_multiprocess, \
89
  is_multiprocess, \
90
- _global_lock, \
 
 
91
  _shared_dicts, \
92
  _init_flags, \
93
  _initialized, \
@@ -105,7 +130,9 @@ def initialize_share_data(workers: int = 1):
105
 
106
  if workers > 1:
107
  is_multiprocess = True
108
- _global_lock = _manager.Lock()
 
 
109
  _shared_dicts = _manager.dict()
110
  _init_flags = _manager.dict()
111
  _update_flags = _manager.dict()
@@ -114,7 +141,9 @@ def initialize_share_data(workers: int = 1):
114
  )
115
  else:
116
  is_multiprocess = False
117
- _global_lock = asyncio.Lock()
 
 
118
  _shared_dicts = {}
119
  _init_flags = {}
120
  _update_flags = {}
@@ -124,13 +153,13 @@ def initialize_share_data(workers: int = 1):
124
  _initialized = True
125
 
126
 
127
- async def initialize_pipeline_namespace():
128
  """
129
  Initialize pipeline namespace with default values.
130
  """
131
  pipeline_namespace = await get_namespace_data("pipeline_status")
132
 
133
- async with get_storage_lock():
134
  # Check if already initialized by checking for required fields
135
  if "busy" in pipeline_namespace:
136
  return
@@ -160,7 +189,7 @@ async def get_update_flag(namespace: str):
160
  if _update_flags is None:
161
  raise ValueError("Try to create namespace before Shared-Data is initialized")
162
 
163
- async with get_storage_lock():
164
  if namespace not in _update_flags:
165
  if is_multiprocess and _manager is not None:
166
  _update_flags[namespace] = _manager.list()
@@ -182,7 +211,7 @@ async def set_all_update_flags(namespace: str):
182
  if _update_flags is None:
183
  raise ValueError("Try to create namespace before Shared-Data is initialized")
184
 
185
- async with get_storage_lock():
186
  if namespace not in _update_flags:
187
  raise ValueError(f"Namespace {namespace} not found in update flags")
188
  # Update flags for both modes
@@ -215,14 +244,6 @@ def try_initialize_namespace(namespace: str) -> bool:
215
  return False
216
 
217
 
218
- def get_storage_lock() -> UnifiedLock:
219
- """return unified storage lock for data consistency"""
220
- return UnifiedLock(
221
- lock=_global_lock,
222
- is_async=not is_multiprocess
223
- )
224
-
225
-
226
  async def get_namespace_data(namespace: str) -> Dict[str, Any]:
227
  """get storage space for specific storage type(namespace)"""
228
  if _shared_dicts is None:
@@ -232,7 +253,7 @@ async def get_namespace_data(namespace: str) -> Dict[str, Any]:
232
  )
233
  raise ValueError("Shared dictionaries not initialized")
234
 
235
- async with get_storage_lock():
236
  if namespace not in _shared_dicts:
237
  if is_multiprocess and _manager is not None:
238
  _shared_dicts[namespace] = _manager.dict()
 
16
 
17
 
18
  T = TypeVar('T')
19
+ LockType = Union[ProcessLock, asyncio.Lock]
20
+
21
+ is_multiprocess = None
22
+ _workers = None
23
+ _manager = None
24
+ _initialized = None
25
+
26
+ # shared data for storage across processes
27
+ _shared_dicts: Optional[Dict[str, Any]] = None
28
+ _init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
29
+ _update_flags: Optional[Dict[str, bool]] = None # namespace -> updated
30
+
31
+ # locks for mutex access
32
+ _storage_lock: Optional[LockType] = None
33
+ _internal_lock: Optional[LockType] = None
34
+ _pipeline_status_lock: Optional[LockType] = None
35
 
36
  class UnifiedLock(Generic[T]):
37
  """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
 
55
  def __enter__(self) -> 'UnifiedLock[T]':
56
  """For backward compatibility"""
57
  if self._is_async:
58
+ raise RuntimeError("Use 'async with' for shared_storage lock")
59
  self._lock.acquire()
60
  return self
61
 
62
  def __exit__(self, exc_type, exc_val, exc_tb):
63
  """For backward compatibility"""
64
  if self._is_async:
65
+ raise RuntimeError("Use 'async with' for shared_storage lock")
66
  self._lock.release()
67
 
68
 
69
+ def get_internal_lock() -> UnifiedLock:
70
+ """return unified storage lock for data consistency"""
71
+ return UnifiedLock(
72
+ lock=_internal_lock,
73
+ is_async=not is_multiprocess
74
+ )
 
75
 
76
+ def get_storage_lock() -> UnifiedLock:
77
+ """return unified storage lock for data consistency"""
78
+ return UnifiedLock(
79
+ lock=_storage_lock,
80
+ is_async=not is_multiprocess
81
+ )
82
 
83
+ def get_pipeline_status_lock() -> UnifiedLock:
84
+ """return unified storage lock for data consistency"""
85
+ return UnifiedLock(
86
+ lock=_pipeline_status_lock,
87
+ is_async=not is_multiprocess
88
+ )
89
 
90
  def initialize_share_data(workers: int = 1):
91
  """
 
110
  _workers, \
111
  is_multiprocess, \
112
  is_multiprocess, \
113
+ _storage_lock, \
114
+ _internal_lock, \
115
+ _pipeline_status_lock, \
116
  _shared_dicts, \
117
  _init_flags, \
118
  _initialized, \
 
130
 
131
  if workers > 1:
132
  is_multiprocess = True
133
+ _internal_lock = _manager.Lock()
134
+ _storage_lock = _manager.Lock()
135
+ _pipeline_status_lock = _manager.Lock()
136
  _shared_dicts = _manager.dict()
137
  _init_flags = _manager.dict()
138
  _update_flags = _manager.dict()
 
141
  )
142
  else:
143
  is_multiprocess = False
144
+ _internal_lock = asyncio.Lock()
145
+ _storage_lock = asyncio.Lock()
146
+ _pipeline_status_lock = asyncio.Lock()
147
  _shared_dicts = {}
148
  _init_flags = {}
149
  _update_flags = {}
 
153
  _initialized = True
154
 
155
 
156
+ async def initialize_pipeline_status():
157
  """
158
  Initialize pipeline namespace with default values.
159
  """
160
  pipeline_namespace = await get_namespace_data("pipeline_status")
161
 
162
+ async with get_internal_lock():
163
  # Check if already initialized by checking for required fields
164
  if "busy" in pipeline_namespace:
165
  return
 
189
  if _update_flags is None:
190
  raise ValueError("Try to create namespace before Shared-Data is initialized")
191
 
192
+ async with get_internal_lock():
193
  if namespace not in _update_flags:
194
  if is_multiprocess and _manager is not None:
195
  _update_flags[namespace] = _manager.list()
 
211
  if _update_flags is None:
212
  raise ValueError("Try to create namespace before Shared-Data is initialized")
213
 
214
+ async with get_internal_lock():
215
  if namespace not in _update_flags:
216
  raise ValueError(f"Namespace {namespace} not found in update flags")
217
  # Update flags for both modes
 
244
  return False
245
 
246
 
 
 
 
 
 
 
 
 
247
  async def get_namespace_data(namespace: str) -> Dict[str, Any]:
248
  """get storage space for specific storage type(namespace)"""
249
  if _shared_dicts is None:
 
253
  )
254
  raise ValueError("Shared dictionaries not initialized")
255
 
256
+ async with get_internal_lock():
257
  if namespace not in _shared_dicts:
258
  if is_multiprocess and _manager is not None:
259
  _shared_dicts[namespace] = _manager.dict()
lightrag/lightrag.py CHANGED
@@ -669,15 +669,15 @@ class LightRAG:
669
  3. Process each chunk for entity and relation extraction
670
  4. Update the document status
671
  """
672
- from lightrag.kg.shared_storage import get_namespace_data, get_storage_lock
673
 
674
  # Get pipeline status shared data and lock
675
  pipeline_status = await get_namespace_data("pipeline_status")
676
- storage_lock = get_storage_lock()
677
 
678
  # Check if another process is already processing the queue
679
  process_documents = False
680
- async with storage_lock:
681
  # Ensure only one worker is processing documents
682
  if not pipeline_status.get("busy", False):
683
  # Cleaning history_messages without breaking it as a shared list object
@@ -851,7 +851,7 @@ class LightRAG:
851
 
852
  # Check if there's a pending request to process more documents (with lock)
853
  has_pending_request = False
854
- async with storage_lock:
855
  has_pending_request = pipeline_status.get("request_pending", False)
856
  if has_pending_request:
857
  # Clear the request flag before checking for more documents
@@ -869,7 +869,7 @@ class LightRAG:
869
  log_message = "Document processing pipeline completed"
870
  logger.info(log_message)
871
  # Always reset busy status when done or if an exception occurs (with lock)
872
- async with storage_lock:
873
  pipeline_status["busy"] = False
874
  pipeline_status["latest_message"] = log_message
875
  pipeline_status["history_messages"].append(log_message)
 
669
  3. Process each chunk for entity and relation extraction
670
  4. Update the document status
671
  """
672
+ from lightrag.kg.shared_storage import get_namespace_data, get_pipeline_status_lock
673
 
674
  # Get pipeline status shared data and lock
675
  pipeline_status = await get_namespace_data("pipeline_status")
676
+ pipeline_status_lock = get_pipeline_status_lock()
677
 
678
  # Check if another process is already processing the queue
679
  process_documents = False
680
+ async with pipeline_status_lock:
681
  # Ensure only one worker is processing documents
682
  if not pipeline_status.get("busy", False):
683
  # Cleaning history_messages without breaking it as a shared list object
 
851
 
852
  # Check if there's a pending request to process more documents (with lock)
853
  has_pending_request = False
854
+ async with pipeline_status_lock:
855
  has_pending_request = pipeline_status.get("request_pending", False)
856
  if has_pending_request:
857
  # Clear the request flag before checking for more documents
 
869
  log_message = "Document processing pipeline completed"
870
  logger.info(log_message)
871
  # Always reset busy status when done or if an exception occurs (with lock)
872
+ async with pipeline_status_lock:
873
  pipeline_status["busy"] = False
874
  pipeline_status["latest_message"] = log_message
875
  pipeline_status["history_messages"].append(log_message)