Daniel.y commited on
Commit
1473fec
·
unverified ·
2 Parent(s): 54deb16 dd1704b

Merge pull request #1145 from danielaskdd/optimize-doc-scan

Browse files

Optimize parallel file processing in folder scan and fix dead lock problem in Gunicorn multi workers mode

lightrag/api/auth.py CHANGED
@@ -3,6 +3,9 @@ from datetime import datetime, timedelta
3
  import jwt
4
  from fastapi import HTTPException, status
5
  from pydantic import BaseModel
 
 
 
6
 
7
 
8
  class TokenPayload(BaseModel):
 
3
  import jwt
4
  from fastapi import HTTPException, status
5
  from pydantic import BaseModel
6
+ from dotenv import load_dotenv
7
+
8
+ load_dotenv()
9
 
10
 
11
  class TokenPayload(BaseModel):
lightrag/api/gunicorn_config.py CHANGED
@@ -29,7 +29,9 @@ preload_app = True
29
  worker_class = "uvicorn.workers.UvicornWorker"
30
 
31
  # Other Gunicorn configurations
32
- timeout = int(os.getenv("TIMEOUT", 150)) # Default 150s to match run_with_gunicorn.py
 
 
33
  keepalive = int(os.getenv("KEEPALIVE", 5)) # Default 5s
34
 
35
  # Logging configuration
 
29
  worker_class = "uvicorn.workers.UvicornWorker"
30
 
31
  # Other Gunicorn configurations
32
+ timeout = int(
33
+ os.getenv("TIMEOUT", 150 * 2)
34
+ ) # Default 150s *2 to match run_with_gunicorn.py
35
  keepalive = int(os.getenv("KEEPALIVE", 5)) # Default 5s
36
 
37
  # Logging configuration
lightrag/api/lightrag_server.py CHANGED
@@ -49,7 +49,7 @@ from .auth import auth_handler
49
  # Load environment variables
50
  # Updated to use the .env that is inside the current folder
51
  # This update allows the user to put a different.env file for each lightrag folder
52
- load_dotenv(".env", override=True)
53
 
54
  # Initialize config parser
55
  config = configparser.ConfigParser()
 
49
  # Load environment variables
50
  # Updated to use the .env that is inside the current folder
51
  # This update allows the user to put a different.env file for each lightrag folder
52
+ load_dotenv()
53
 
54
  # Initialize config parser
55
  config = configparser.ConfigParser()
lightrag/api/routers/document_routes.py CHANGED
@@ -405,7 +405,7 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path):
405
 
406
 
407
  async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
408
- """Index multiple files concurrently
409
 
410
  Args:
411
  rag: LightRAG instance
@@ -416,12 +416,12 @@ async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
416
  try:
417
  enqueued = False
418
 
419
- if len(file_paths) == 1:
420
- enqueued = await pipeline_enqueue_file(rag, file_paths[0])
421
- else:
422
- tasks = [pipeline_enqueue_file(rag, path) for path in file_paths]
423
- enqueued = any(await asyncio.gather(*tasks))
424
 
 
425
  if enqueued:
426
  await rag.apipeline_process_enqueue_documents()
427
  except Exception as e:
@@ -472,14 +472,34 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
472
  total_files = len(new_files)
473
  logger.info(f"Found {total_files} new files to index.")
474
 
475
- for idx, file_path in enumerate(new_files):
476
- try:
477
- await pipeline_index_file(rag, file_path)
478
- except Exception as e:
479
- logger.error(f"Error indexing file {file_path}: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
480
 
481
  except Exception as e:
482
  logger.error(f"Error during scanning process: {str(e)}")
 
483
 
484
 
485
  def create_document_routes(
 
405
 
406
 
407
  async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
408
+ """Index multiple files sequentially to avoid high CPU load
409
 
410
  Args:
411
  rag: LightRAG instance
 
416
  try:
417
  enqueued = False
418
 
419
+ # Process files sequentially
420
+ for file_path in file_paths:
421
+ if await pipeline_enqueue_file(rag, file_path):
422
+ enqueued = True
 
423
 
424
+ # Process the queue only if at least one file was successfully enqueued
425
  if enqueued:
426
  await rag.apipeline_process_enqueue_documents()
427
  except Exception as e:
 
472
  total_files = len(new_files)
473
  logger.info(f"Found {total_files} new files to index.")
474
 
475
+ if not new_files:
476
+ return
477
+
478
+ # Get MAX_PARALLEL_INSERT from global_args
479
+ max_parallel = global_args["max_parallel_insert"]
480
+ # Calculate batch size as 2 * MAX_PARALLEL_INSERT
481
+ batch_size = 2 * max_parallel
482
+
483
+ # Process files in batches
484
+ for i in range(0, total_files, batch_size):
485
+ batch_files = new_files[i : i + batch_size]
486
+ batch_num = i // batch_size + 1
487
+ total_batches = (total_files + batch_size - 1) // batch_size
488
+
489
+ logger.info(
490
+ f"Processing batch {batch_num}/{total_batches} with {len(batch_files)} files"
491
+ )
492
+ await pipeline_index_files(rag, batch_files)
493
+
494
+ # Log progress
495
+ processed = min(i + batch_size, total_files)
496
+ logger.info(
497
+ f"Processed {processed}/{total_files} files ({processed/total_files*100:.1f}%)"
498
+ )
499
 
500
  except Exception as e:
501
  logger.error(f"Error during scanning process: {str(e)}")
502
+ logger.error(traceback.format_exc())
503
 
504
 
505
  def create_document_routes(
lightrag/api/run_with_gunicorn.py CHANGED
@@ -13,7 +13,7 @@ from dotenv import load_dotenv
13
 
14
  # Updated to use the .env that is inside the current folder
15
  # This update allows the user to put a different.env file for each lightrag folder
16
- load_dotenv(".env")
17
 
18
 
19
  def check_and_install_dependencies():
@@ -140,7 +140,7 @@ def main():
140
 
141
  # Timeout configuration prioritizes command line arguments
142
  gunicorn_config.timeout = (
143
- args.timeout if args.timeout else int(os.getenv("TIMEOUT", 150))
144
  )
145
 
146
  # Keepalive configuration
 
13
 
14
  # Updated to use the .env that is inside the current folder
15
  # This update allows the user to put a different.env file for each lightrag folder
16
+ load_dotenv()
17
 
18
 
19
  def check_and_install_dependencies():
 
140
 
141
  # Timeout configuration prioritizes command line arguments
142
  gunicorn_config.timeout = (
143
+ args.timeout if args.timeout * 2 else int(os.getenv("TIMEOUT", 150 * 2))
144
  )
145
 
146
  # Keepalive configuration
lightrag/api/utils_api.py CHANGED
@@ -16,7 +16,7 @@ from starlette.status import HTTP_403_FORBIDDEN
16
  from .auth import auth_handler
17
 
18
  # Load environment variables
19
- load_dotenv(override=True)
20
 
21
  global_args = {"main_args": None}
22
 
@@ -365,6 +365,9 @@ def parse_args(is_uvicorn_mode: bool = False) -> argparse.Namespace:
365
  "LIGHTRAG_VECTOR_STORAGE", DefaultRAGStorageConfig.VECTOR_STORAGE
366
  )
367
 
 
 
 
368
  # Handle openai-ollama special case
369
  if args.llm_binding == "openai-ollama":
370
  args.llm_binding = "openai"
@@ -441,8 +444,8 @@ def display_splash_screen(args: argparse.Namespace) -> None:
441
  ASCIIColors.yellow(f"{args.log_level}")
442
  ASCIIColors.white(" ├─ Verbose Debug: ", end="")
443
  ASCIIColors.yellow(f"{args.verbose}")
444
- ASCIIColors.white(" ├─ Timeout: ", end="")
445
- ASCIIColors.yellow(f"{args.timeout if args.timeout else 'None (infinite)'}")
446
  ASCIIColors.white(" └─ API Key: ", end="")
447
  ASCIIColors.yellow("Set" if args.key else "Not Set")
448
 
@@ -459,8 +462,10 @@ def display_splash_screen(args: argparse.Namespace) -> None:
459
  ASCIIColors.yellow(f"{args.llm_binding}")
460
  ASCIIColors.white(" ├─ Host: ", end="")
461
  ASCIIColors.yellow(f"{args.llm_binding_host}")
462
- ASCIIColors.white(" └─ Model: ", end="")
463
  ASCIIColors.yellow(f"{args.llm_model}")
 
 
464
 
465
  # Embedding Configuration
466
  ASCIIColors.magenta("\n📊 Embedding Configuration:")
@@ -475,8 +480,10 @@ def display_splash_screen(args: argparse.Namespace) -> None:
475
 
476
  # RAG Configuration
477
  ASCIIColors.magenta("\n⚙️ RAG Configuration:")
478
- ASCIIColors.white(" ├─ Max Async Operations: ", end="")
479
  ASCIIColors.yellow(f"{args.max_async}")
 
 
480
  ASCIIColors.white(" ├─ Max Tokens: ", end="")
481
  ASCIIColors.yellow(f"{args.max_tokens}")
482
  ASCIIColors.white(" ├─ Max Embed Tokens: ", end="")
@@ -485,8 +492,6 @@ def display_splash_screen(args: argparse.Namespace) -> None:
485
  ASCIIColors.yellow(f"{args.chunk_size}")
486
  ASCIIColors.white(" ├─ Chunk Overlap Size: ", end="")
487
  ASCIIColors.yellow(f"{args.chunk_overlap_size}")
488
- ASCIIColors.white(" ├─ History Turns: ", end="")
489
- ASCIIColors.yellow(f"{args.history_turns}")
490
  ASCIIColors.white(" ├─ Cosine Threshold: ", end="")
491
  ASCIIColors.yellow(f"{args.cosine_threshold}")
492
  ASCIIColors.white(" ├─ Top-K: ", end="")
 
16
  from .auth import auth_handler
17
 
18
  # Load environment variables
19
+ load_dotenv()
20
 
21
  global_args = {"main_args": None}
22
 
 
365
  "LIGHTRAG_VECTOR_STORAGE", DefaultRAGStorageConfig.VECTOR_STORAGE
366
  )
367
 
368
+ # Get MAX_PARALLEL_INSERT from environment
369
+ global_args["max_parallel_insert"] = get_env_value("MAX_PARALLEL_INSERT", 2, int)
370
+
371
  # Handle openai-ollama special case
372
  if args.llm_binding == "openai-ollama":
373
  args.llm_binding = "openai"
 
444
  ASCIIColors.yellow(f"{args.log_level}")
445
  ASCIIColors.white(" ├─ Verbose Debug: ", end="")
446
  ASCIIColors.yellow(f"{args.verbose}")
447
+ ASCIIColors.white(" ├─ History Turns: ", end="")
448
+ ASCIIColors.yellow(f"{args.history_turns}")
449
  ASCIIColors.white(" └─ API Key: ", end="")
450
  ASCIIColors.yellow("Set" if args.key else "Not Set")
451
 
 
462
  ASCIIColors.yellow(f"{args.llm_binding}")
463
  ASCIIColors.white(" ├─ Host: ", end="")
464
  ASCIIColors.yellow(f"{args.llm_binding_host}")
465
+ ASCIIColors.white(" ├─ Model: ", end="")
466
  ASCIIColors.yellow(f"{args.llm_model}")
467
+ ASCIIColors.white(" └─ Timeout: ", end="")
468
+ ASCIIColors.yellow(f"{args.timeout if args.timeout else 'None (infinite)'}")
469
 
470
  # Embedding Configuration
471
  ASCIIColors.magenta("\n📊 Embedding Configuration:")
 
480
 
481
  # RAG Configuration
482
  ASCIIColors.magenta("\n⚙️ RAG Configuration:")
483
+ ASCIIColors.white(" ├─ Max Async for LLM: ", end="")
484
  ASCIIColors.yellow(f"{args.max_async}")
485
+ ASCIIColors.white(" ├─ Max Parallel Insert: ", end="")
486
+ ASCIIColors.yellow(f"{global_args['max_parallel_insert']}")
487
  ASCIIColors.white(" ├─ Max Tokens: ", end="")
488
  ASCIIColors.yellow(f"{args.max_tokens}")
489
  ASCIIColors.white(" ├─ Max Embed Tokens: ", end="")
 
492
  ASCIIColors.yellow(f"{args.chunk_size}")
493
  ASCIIColors.white(" ├─ Chunk Overlap Size: ", end="")
494
  ASCIIColors.yellow(f"{args.chunk_overlap_size}")
 
 
495
  ASCIIColors.white(" ├─ Cosine Threshold: ", end="")
496
  ASCIIColors.yellow(f"{args.cosine_threshold}")
497
  ASCIIColors.white(" ├─ Top-K: ", end="")
lightrag/kg/shared_storage.py CHANGED
@@ -41,6 +41,9 @@ _pipeline_status_lock: Optional[LockType] = None
41
  _graph_db_lock: Optional[LockType] = None
42
  _data_init_lock: Optional[LockType] = None
43
 
 
 
 
44
 
45
  class UnifiedLock(Generic[T]):
46
  """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
@@ -51,12 +54,14 @@ class UnifiedLock(Generic[T]):
51
  is_async: bool,
52
  name: str = "unnamed",
53
  enable_logging: bool = True,
 
54
  ):
55
  self._lock = lock
56
  self._is_async = is_async
57
  self._pid = os.getpid() # for debug only
58
  self._name = name # for debug only
59
  self._enable_logging = enable_logging # for debug only
 
60
 
61
  async def __aenter__(self) -> "UnifiedLock[T]":
62
  try:
@@ -64,16 +69,39 @@ class UnifiedLock(Generic[T]):
64
  f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})",
65
  enable_output=self._enable_logging,
66
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  if self._is_async:
68
  await self._lock.acquire()
69
  else:
70
  self._lock.acquire()
 
71
  direct_log(
72
  f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})",
73
  enable_output=self._enable_logging,
74
  )
75
  return self
76
  except Exception as e:
 
 
 
 
 
 
 
 
77
  direct_log(
78
  f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}",
79
  level="ERROR",
@@ -82,15 +110,29 @@ class UnifiedLock(Generic[T]):
82
  raise
83
 
84
  async def __aexit__(self, exc_type, exc_val, exc_tb):
 
85
  try:
86
  direct_log(
87
  f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})",
88
  enable_output=self._enable_logging,
89
  )
 
 
90
  if self._is_async:
91
  self._lock.release()
92
  else:
93
  self._lock.release()
 
 
 
 
 
 
 
 
 
 
 
94
  direct_log(
95
  f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})",
96
  enable_output=self._enable_logging,
@@ -101,6 +143,31 @@ class UnifiedLock(Generic[T]):
101
  level="ERROR",
102
  enable_output=self._enable_logging,
103
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  raise
105
 
106
  def __enter__(self) -> "UnifiedLock[T]":
@@ -151,51 +218,61 @@ class UnifiedLock(Generic[T]):
151
 
152
  def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
153
  """return unified storage lock for data consistency"""
 
154
  return UnifiedLock(
155
  lock=_internal_lock,
156
  is_async=not is_multiprocess,
157
  name="internal_lock",
158
  enable_logging=enable_logging,
 
159
  )
160
 
161
 
162
  def get_storage_lock(enable_logging: bool = False) -> UnifiedLock:
163
  """return unified storage lock for data consistency"""
 
164
  return UnifiedLock(
165
  lock=_storage_lock,
166
  is_async=not is_multiprocess,
167
  name="storage_lock",
168
  enable_logging=enable_logging,
 
169
  )
170
 
171
 
172
  def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
173
  """return unified storage lock for data consistency"""
 
174
  return UnifiedLock(
175
  lock=_pipeline_status_lock,
176
  is_async=not is_multiprocess,
177
  name="pipeline_status_lock",
178
  enable_logging=enable_logging,
 
179
  )
180
 
181
 
182
  def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
183
  """return unified graph database lock for ensuring atomic operations"""
 
184
  return UnifiedLock(
185
  lock=_graph_db_lock,
186
  is_async=not is_multiprocess,
187
  name="graph_db_lock",
188
  enable_logging=enable_logging,
 
189
  )
190
 
191
 
192
  def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
193
  """return unified data initialization lock for ensuring atomic data initialization"""
 
194
  return UnifiedLock(
195
  lock=_data_init_lock,
196
  is_async=not is_multiprocess,
197
  name="data_init_lock",
198
  enable_logging=enable_logging,
 
199
  )
200
 
201
 
@@ -229,7 +306,8 @@ def initialize_share_data(workers: int = 1):
229
  _shared_dicts, \
230
  _init_flags, \
231
  _initialized, \
232
- _update_flags
 
233
 
234
  # Check if already initialized
235
  if _initialized:
@@ -251,6 +329,16 @@ def initialize_share_data(workers: int = 1):
251
  _shared_dicts = _manager.dict()
252
  _init_flags = _manager.dict()
253
  _update_flags = _manager.dict()
 
 
 
 
 
 
 
 
 
 
254
  direct_log(
255
  f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
256
  )
@@ -264,6 +352,7 @@ def initialize_share_data(workers: int = 1):
264
  _shared_dicts = {}
265
  _init_flags = {}
266
  _update_flags = {}
 
267
  direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
268
 
269
  # Mark as initialized
@@ -458,7 +547,8 @@ def finalize_share_data():
458
  _shared_dicts, \
459
  _init_flags, \
460
  _initialized, \
461
- _update_flags
 
462
 
463
  # Check if already initialized
464
  if not _initialized:
@@ -523,5 +613,6 @@ def finalize_share_data():
523
  _graph_db_lock = None
524
  _data_init_lock = None
525
  _update_flags = None
 
526
 
527
  direct_log(f"Process {os.getpid()} storage data finalization complete")
 
41
  _graph_db_lock: Optional[LockType] = None
42
  _data_init_lock: Optional[LockType] = None
43
 
44
+ # async locks for coroutine synchronization in multiprocess mode
45
+ _async_locks: Optional[Dict[str, asyncio.Lock]] = None
46
+
47
 
48
  class UnifiedLock(Generic[T]):
49
  """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
 
54
  is_async: bool,
55
  name: str = "unnamed",
56
  enable_logging: bool = True,
57
+ async_lock: Optional[asyncio.Lock] = None,
58
  ):
59
  self._lock = lock
60
  self._is_async = is_async
61
  self._pid = os.getpid() # for debug only
62
  self._name = name # for debug only
63
  self._enable_logging = enable_logging # for debug only
64
+ self._async_lock = async_lock # auxiliary lock for coroutine synchronization
65
 
66
  async def __aenter__(self) -> "UnifiedLock[T]":
67
  try:
 
69
  f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})",
70
  enable_output=self._enable_logging,
71
  )
72
+
73
+ # If in multiprocess mode and async lock exists, acquire it first
74
+ if not self._is_async and self._async_lock is not None:
75
+ direct_log(
76
+ f"== Lock == Process {self._pid}: Acquiring async lock for '{self._name}'",
77
+ enable_output=self._enable_logging,
78
+ )
79
+ await self._async_lock.acquire()
80
+ direct_log(
81
+ f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired",
82
+ enable_output=self._enable_logging,
83
+ )
84
+
85
+ # Then acquire the main lock
86
  if self._is_async:
87
  await self._lock.acquire()
88
  else:
89
  self._lock.acquire()
90
+
91
  direct_log(
92
  f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})",
93
  enable_output=self._enable_logging,
94
  )
95
  return self
96
  except Exception as e:
97
+ # If main lock acquisition fails, release the async lock if it was acquired
98
+ if (
99
+ not self._is_async
100
+ and self._async_lock is not None
101
+ and self._async_lock.locked()
102
+ ):
103
+ self._async_lock.release()
104
+
105
  direct_log(
106
  f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}",
107
  level="ERROR",
 
110
  raise
111
 
112
  async def __aexit__(self, exc_type, exc_val, exc_tb):
113
+ main_lock_released = False
114
  try:
115
  direct_log(
116
  f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})",
117
  enable_output=self._enable_logging,
118
  )
119
+
120
+ # Release main lock first
121
  if self._is_async:
122
  self._lock.release()
123
  else:
124
  self._lock.release()
125
+
126
+ main_lock_released = True
127
+
128
+ # Then release async lock if in multiprocess mode
129
+ if not self._is_async and self._async_lock is not None:
130
+ direct_log(
131
+ f"== Lock == Process {self._pid}: Releasing async lock for '{self._name}'",
132
+ enable_output=self._enable_logging,
133
+ )
134
+ self._async_lock.release()
135
+
136
  direct_log(
137
  f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})",
138
  enable_output=self._enable_logging,
 
143
  level="ERROR",
144
  enable_output=self._enable_logging,
145
  )
146
+
147
+ # If main lock release failed but async lock hasn't been released, try to release it
148
+ if (
149
+ not main_lock_released
150
+ and not self._is_async
151
+ and self._async_lock is not None
152
+ ):
153
+ try:
154
+ direct_log(
155
+ f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure",
156
+ level="WARNING",
157
+ enable_output=self._enable_logging,
158
+ )
159
+ self._async_lock.release()
160
+ direct_log(
161
+ f"== Lock == Process {self._pid}: Successfully released async lock after main lock failure",
162
+ enable_output=self._enable_logging,
163
+ )
164
+ except Exception as inner_e:
165
+ direct_log(
166
+ f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}",
167
+ level="ERROR",
168
+ enable_output=self._enable_logging,
169
+ )
170
+
171
  raise
172
 
173
  def __enter__(self) -> "UnifiedLock[T]":
 
218
 
219
  def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
220
  """return unified storage lock for data consistency"""
221
+ async_lock = _async_locks.get("internal_lock") if is_multiprocess else None
222
  return UnifiedLock(
223
  lock=_internal_lock,
224
  is_async=not is_multiprocess,
225
  name="internal_lock",
226
  enable_logging=enable_logging,
227
+ async_lock=async_lock,
228
  )
229
 
230
 
231
  def get_storage_lock(enable_logging: bool = False) -> UnifiedLock:
232
  """return unified storage lock for data consistency"""
233
+ async_lock = _async_locks.get("storage_lock") if is_multiprocess else None
234
  return UnifiedLock(
235
  lock=_storage_lock,
236
  is_async=not is_multiprocess,
237
  name="storage_lock",
238
  enable_logging=enable_logging,
239
+ async_lock=async_lock,
240
  )
241
 
242
 
243
  def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
244
  """return unified storage lock for data consistency"""
245
+ async_lock = _async_locks.get("pipeline_status_lock") if is_multiprocess else None
246
  return UnifiedLock(
247
  lock=_pipeline_status_lock,
248
  is_async=not is_multiprocess,
249
  name="pipeline_status_lock",
250
  enable_logging=enable_logging,
251
+ async_lock=async_lock,
252
  )
253
 
254
 
255
  def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
256
  """return unified graph database lock for ensuring atomic operations"""
257
+ async_lock = _async_locks.get("graph_db_lock") if is_multiprocess else None
258
  return UnifiedLock(
259
  lock=_graph_db_lock,
260
  is_async=not is_multiprocess,
261
  name="graph_db_lock",
262
  enable_logging=enable_logging,
263
+ async_lock=async_lock,
264
  )
265
 
266
 
267
  def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
268
  """return unified data initialization lock for ensuring atomic data initialization"""
269
+ async_lock = _async_locks.get("data_init_lock") if is_multiprocess else None
270
  return UnifiedLock(
271
  lock=_data_init_lock,
272
  is_async=not is_multiprocess,
273
  name="data_init_lock",
274
  enable_logging=enable_logging,
275
+ async_lock=async_lock,
276
  )
277
 
278
 
 
306
  _shared_dicts, \
307
  _init_flags, \
308
  _initialized, \
309
+ _update_flags, \
310
+ _async_locks
311
 
312
  # Check if already initialized
313
  if _initialized:
 
329
  _shared_dicts = _manager.dict()
330
  _init_flags = _manager.dict()
331
  _update_flags = _manager.dict()
332
+
333
+ # Initialize async locks for multiprocess mode
334
+ _async_locks = {
335
+ "internal_lock": asyncio.Lock(),
336
+ "storage_lock": asyncio.Lock(),
337
+ "pipeline_status_lock": asyncio.Lock(),
338
+ "graph_db_lock": asyncio.Lock(),
339
+ "data_init_lock": asyncio.Lock(),
340
+ }
341
+
342
  direct_log(
343
  f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
344
  )
 
352
  _shared_dicts = {}
353
  _init_flags = {}
354
  _update_flags = {}
355
+ _async_locks = None # No need for async locks in single process mode
356
  direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
357
 
358
  # Mark as initialized
 
547
  _shared_dicts, \
548
  _init_flags, \
549
  _initialized, \
550
+ _update_flags, \
551
+ _async_locks
552
 
553
  # Check if already initialized
554
  if not _initialized:
 
613
  _graph_db_lock = None
614
  _data_init_lock = None
615
  _update_flags = None
616
+ _async_locks = None
617
 
618
  direct_log(f"Process {os.getpid()} storage data finalization complete")