gzdaniel commited on
Commit
ab0e6bc
·
1 Parent(s): 6bcca36

Feat: Add keyed lock cleanup and status monitoring

Browse files
lightrag/api/lightrag_server.py CHANGED
@@ -52,6 +52,7 @@ from lightrag.kg.shared_storage import (
52
  get_namespace_data,
53
  get_pipeline_status_lock,
54
  initialize_pipeline_status,
 
55
  )
56
  from fastapi.security import OAuth2PasswordRequestForm
57
  from lightrag.api.auth import auth_handler
@@ -486,6 +487,9 @@ def create_app(args):
486
  else:
487
  auth_mode = "enabled"
488
 
 
 
 
489
  return {
490
  "status": "healthy",
491
  "working_directory": str(args.working_dir),
@@ -517,6 +521,7 @@ def create_app(args):
517
  },
518
  "auth_mode": auth_mode,
519
  "pipeline_busy": pipeline_status.get("busy", False),
 
520
  "core_version": core_version,
521
  "api_version": __api_version__,
522
  "webui_title": webui_title,
 
52
  get_namespace_data,
53
  get_pipeline_status_lock,
54
  initialize_pipeline_status,
55
+ cleanup_keyed_lock,
56
  )
57
  from fastapi.security import OAuth2PasswordRequestForm
58
  from lightrag.api.auth import auth_handler
 
487
  else:
488
  auth_mode = "enabled"
489
 
490
+ # Cleanup expired keyed locks and get status
491
+ keyed_lock_info = cleanup_keyed_lock()
492
+
493
  return {
494
  "status": "healthy",
495
  "working_directory": str(args.working_dir),
 
521
  },
522
  "auth_mode": auth_mode,
523
  "pipeline_busy": pipeline_status.get("busy", False),
524
+ "keyed_locks": keyed_lock_info,
525
  "core_version": core_version,
526
  "api_version": __api_version__,
527
  "webui_title": webui_title,
lightrag/kg/shared_storage.py CHANGED
@@ -280,6 +280,125 @@ def _get_combined_key(factory_name: str, key: str) -> str:
280
  return f"{factory_name}:{key}"
281
 
282
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
283
  def _get_or_create_shared_raw_mp_lock(
284
  factory_name: str, key: str
285
  ) -> Optional[mp.synchronize.Lock]:
@@ -346,86 +465,22 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
346
  ):
347
  _earliest_mp_cleanup_time = current_time
348
 
349
- # Efficient cleanup triggering with minimum interval control
350
- total_cleanup_len = len(_lock_cleanup_data)
351
- if total_cleanup_len >= CLEANUP_THRESHOLD:
352
- # Time rollback detection
353
- if (
354
- _last_mp_cleanup_time is not None
355
- and current_time < _last_mp_cleanup_time
356
- ):
357
- direct_log(
358
- "== mp Lock == Time rollback detected, resetting cleanup time",
359
- level="WARNING",
360
- enable_output=False,
361
- )
362
- _last_mp_cleanup_time = None
363
-
364
- # Check cleanup conditions
365
- has_expired_locks = (
366
- _earliest_mp_cleanup_time is not None
367
- and current_time - _earliest_mp_cleanup_time
368
- > CLEANUP_KEYED_LOCKS_AFTER_SECONDS
369
- )
370
-
371
- interval_satisfied = (
372
- _last_mp_cleanup_time is None
373
- or current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS
374
- )
375
-
376
- if has_expired_locks and interval_satisfied:
377
- try:
378
- cleaned_count = 0
379
- new_earliest_time = None
380
-
381
- # Perform cleanup while maintaining the new earliest time
382
- # Clean expired locks from all namespaces
383
- for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()):
384
- if (
385
- current_time - cleanup_time
386
- > CLEANUP_KEYED_LOCKS_AFTER_SECONDS
387
- ):
388
- _lock_registry.pop(cleanup_key, None)
389
- _lock_registry_count.pop(cleanup_key, None)
390
- _lock_cleanup_data.pop(cleanup_key, None)
391
- cleaned_count += 1
392
- else:
393
- # Track the earliest time among remaining locks
394
- if (
395
- new_earliest_time is None
396
- or cleanup_time < new_earliest_time
397
- ):
398
- new_earliest_time = cleanup_time
399
-
400
- # Update state only after successful cleanup
401
- _earliest_mp_cleanup_time = new_earliest_time
402
- _last_mp_cleanup_time = current_time
403
-
404
- if cleaned_count > 0:
405
- next_cleanup_in = max(
406
- (
407
- new_earliest_time
408
- + CLEANUP_KEYED_LOCKS_AFTER_SECONDS
409
- - current_time
410
- )
411
- if new_earliest_time
412
- else float("inf"),
413
- MIN_CLEANUP_INTERVAL_SECONDS,
414
- )
415
- direct_log(
416
- f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, "
417
- f"next cleanup in {next_cleanup_in:.1f}s",
418
- enable_output=False,
419
- level="INFO",
420
- )
421
 
422
- except Exception as e:
423
- direct_log(
424
- f"== mp Lock == Cleanup failed: {e}",
425
- level="ERROR",
426
- enable_output=False,
427
- )
428
- # Don't update _last_mp_cleanup_time to allow retry
429
 
430
 
431
  class KeyedUnifiedLock:
@@ -504,89 +559,22 @@ class KeyedUnifiedLock:
504
  self._earliest_async_cleanup_time = current_time
505
  self._async_lock_count[combined_key] = count
506
 
507
- # Efficient cleanup triggering with minimum interval control
508
- total_cleanup_len = len(self._async_lock_cleanup_data)
509
- if total_cleanup_len >= CLEANUP_THRESHOLD:
510
- # Time rollback detection
511
- if (
512
- self._last_async_cleanup_time is not None
513
- and current_time < self._last_async_cleanup_time
514
- ):
515
- direct_log(
516
- "== async Lock == Time rollback detected, resetting cleanup time",
517
- level="WARNING",
518
- enable_output=False,
519
- )
520
- self._last_async_cleanup_time = None
521
-
522
- # Check cleanup conditions
523
- has_expired_locks = (
524
- self._earliest_async_cleanup_time is not None
525
- and current_time - self._earliest_async_cleanup_time
526
- > CLEANUP_KEYED_LOCKS_AFTER_SECONDS
527
- )
528
-
529
- interval_satisfied = (
530
- self._last_async_cleanup_time is None
531
- or current_time - self._last_async_cleanup_time
532
- > MIN_CLEANUP_INTERVAL_SECONDS
533
- )
534
-
535
- if has_expired_locks and interval_satisfied:
536
- try:
537
- cleaned_count = 0
538
- new_earliest_time = None
539
-
540
- # Perform cleanup while maintaining the new earliest time
541
- # Clean expired async locks from all namespaces
542
- for cleanup_key, cleanup_time in list(
543
- self._async_lock_cleanup_data.items()
544
- ):
545
- if (
546
- current_time - cleanup_time
547
- > CLEANUP_KEYED_LOCKS_AFTER_SECONDS
548
- ):
549
- self._async_lock.pop(cleanup_key)
550
- self._async_lock_count.pop(cleanup_key)
551
- self._async_lock_cleanup_data.pop(cleanup_key)
552
- cleaned_count += 1
553
- else:
554
- # Track the earliest time among remaining locks
555
- if (
556
- new_earliest_time is None
557
- or cleanup_time < new_earliest_time
558
- ):
559
- new_earliest_time = cleanup_time
560
-
561
- # Update state only after successful cleanup
562
- self._earliest_async_cleanup_time = new_earliest_time
563
- self._last_async_cleanup_time = current_time
564
-
565
- if cleaned_count > 0:
566
- next_cleanup_in = max(
567
- (
568
- new_earliest_time
569
- + CLEANUP_KEYED_LOCKS_AFTER_SECONDS
570
- - current_time
571
- )
572
- if new_earliest_time
573
- else float("inf"),
574
- MIN_CLEANUP_INTERVAL_SECONDS,
575
- )
576
- direct_log(
577
- f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks, "
578
- f"next cleanup in {next_cleanup_in:.1f}s",
579
- enable_output=False,
580
- level="INFO",
581
- )
582
 
583
- except Exception as e:
584
- direct_log(
585
- f"== async Lock == Cleanup failed: {e}",
586
- level="ERROR",
587
- enable_output=False,
588
- )
589
- # Don't update _last_async_cleanup_time to allow retry
590
 
591
  def _get_lock_for_key(
592
  self, namespace: str, key: str, enable_logging: bool = False
@@ -627,6 +615,171 @@ class KeyedUnifiedLock:
627
  self._release_async_lock(combined_key)
628
  _release_shared_raw_mp_lock(namespace, key)
629
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
630
 
631
  class _KeyedLockContext:
632
  def __init__(
@@ -747,6 +900,61 @@ def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
747
  )
748
 
749
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
750
  def initialize_share_data(workers: int = 1):
751
  """
752
  Initialize shared storage data for single or multi-process mode.
 
280
  return f"{factory_name}:{key}"
281
 
282
 
283
+ def _perform_lock_cleanup(
284
+ lock_type: str,
285
+ cleanup_data: Dict[str, float],
286
+ lock_registry: Optional[Dict[str, Any]],
287
+ lock_count: Optional[Dict[str, int]],
288
+ earliest_cleanup_time: Optional[float],
289
+ last_cleanup_time: Optional[float],
290
+ current_time: float,
291
+ threshold_check: bool = True,
292
+ ) -> tuple[int, Optional[float], Optional[float]]:
293
+ """
294
+ Generic lock cleanup function to unify cleanup logic for both multiprocess and async locks.
295
+
296
+ Args:
297
+ lock_type: Lock type identifier ("mp" or "async")
298
+ cleanup_data: Cleanup data dictionary
299
+ lock_registry: Lock registry dictionary (can be None for async locks)
300
+ lock_count: Lock count dictionary (can be None for async locks)
301
+ earliest_cleanup_time: Earliest cleanup time
302
+ last_cleanup_time: Last cleanup time
303
+ current_time: Current time
304
+ threshold_check: Whether to check threshold condition (default True, set to False in cleanup_expired_locks)
305
+
306
+ Returns:
307
+ tuple: (cleaned_count, new_earliest_time, new_last_cleanup_time)
308
+ """
309
+ if len(cleanup_data) == 0:
310
+ return 0, earliest_cleanup_time, last_cleanup_time
311
+
312
+ # If threshold check is needed and threshold not reached, return directly
313
+ if threshold_check and len(cleanup_data) < CLEANUP_THRESHOLD:
314
+ return 0, earliest_cleanup_time, last_cleanup_time
315
+
316
+ # Time rollback detection
317
+ if last_cleanup_time is not None and current_time < last_cleanup_time:
318
+ direct_log(
319
+ f"== {lock_type} Lock == Time rollback detected, resetting cleanup time",
320
+ level="WARNING",
321
+ enable_output=False,
322
+ )
323
+ last_cleanup_time = None
324
+
325
+ # Check cleanup conditions
326
+ has_expired_locks = (
327
+ earliest_cleanup_time is not None
328
+ and current_time - earliest_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS
329
+ )
330
+
331
+ interval_satisfied = (
332
+ last_cleanup_time is None
333
+ or current_time - last_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS
334
+ )
335
+
336
+ if not (has_expired_locks and interval_satisfied):
337
+ return 0, earliest_cleanup_time, last_cleanup_time
338
+
339
+ try:
340
+ cleaned_count = 0
341
+ new_earliest_time = None
342
+
343
+ # Perform cleanup operation
344
+ for cleanup_key, cleanup_time in list(cleanup_data.items()):
345
+ if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
346
+ # Remove from cleanup data
347
+ cleanup_data.pop(cleanup_key, None)
348
+
349
+ # Remove from lock registry if exists
350
+ if lock_registry is not None:
351
+ lock_registry.pop(cleanup_key, None)
352
+ if lock_count is not None:
353
+ lock_count.pop(cleanup_key, None)
354
+
355
+ cleaned_count += 1
356
+ else:
357
+ # Track the earliest time among remaining locks
358
+ if new_earliest_time is None or cleanup_time < new_earliest_time:
359
+ new_earliest_time = cleanup_time
360
+
361
+ # Update state only after successful cleanup
362
+ if cleaned_count > 0:
363
+ new_last_cleanup_time = current_time
364
+
365
+ # Log cleanup results
366
+ next_cleanup_in = max(
367
+ (new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time)
368
+ if new_earliest_time
369
+ else float("inf"),
370
+ MIN_CLEANUP_INTERVAL_SECONDS,
371
+ )
372
+ total_cleanup_len = len(cleanup_data)
373
+
374
+ if lock_type == "async":
375
+ direct_log(
376
+ f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired {lock_type} locks, "
377
+ f"next cleanup in {next_cleanup_in:.1f}s",
378
+ enable_output=False,
379
+ level="INFO",
380
+ )
381
+ else:
382
+ direct_log(
383
+ f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, "
384
+ f"next cleanup in {next_cleanup_in:.1f}s",
385
+ enable_output=False,
386
+ level="INFO",
387
+ )
388
+
389
+ return cleaned_count, new_earliest_time, new_last_cleanup_time
390
+ else:
391
+ return 0, earliest_cleanup_time, last_cleanup_time
392
+
393
+ except Exception as e:
394
+ direct_log(
395
+ f"== {lock_type} Lock == Cleanup failed: {e}",
396
+ level="ERROR",
397
+ enable_output=False,
398
+ )
399
+ return 0, earliest_cleanup_time, last_cleanup_time
400
+
401
+
402
  def _get_or_create_shared_raw_mp_lock(
403
  factory_name: str, key: str
404
  ) -> Optional[mp.synchronize.Lock]:
 
465
  ):
466
  _earliest_mp_cleanup_time = current_time
467
 
468
+ # Use generic cleanup function
469
+ cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup(
470
+ lock_type="mp",
471
+ cleanup_data=_lock_cleanup_data,
472
+ lock_registry=_lock_registry,
473
+ lock_count=_lock_registry_count,
474
+ earliest_cleanup_time=_earliest_mp_cleanup_time,
475
+ last_cleanup_time=_last_mp_cleanup_time,
476
+ current_time=current_time,
477
+ threshold_check=True,
478
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
479
 
480
+ # Update global state if cleanup was performed
481
+ if cleaned_count > 0:
482
+ _earliest_mp_cleanup_time = new_earliest_time
483
+ _last_mp_cleanup_time = new_last_cleanup_time
 
 
 
484
 
485
 
486
  class KeyedUnifiedLock:
 
559
  self._earliest_async_cleanup_time = current_time
560
  self._async_lock_count[combined_key] = count
561
 
562
+ # Use generic cleanup function
563
+ cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup(
564
+ lock_type="async",
565
+ cleanup_data=self._async_lock_cleanup_data,
566
+ lock_registry=self._async_lock,
567
+ lock_count=self._async_lock_count,
568
+ earliest_cleanup_time=self._earliest_async_cleanup_time,
569
+ last_cleanup_time=self._last_async_cleanup_time,
570
+ current_time=current_time,
571
+ threshold_check=True,
572
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
573
 
574
+ # Update instance state if cleanup was performed
575
+ if cleaned_count > 0:
576
+ self._earliest_async_cleanup_time = new_earliest_time
577
+ self._last_async_cleanup_time = new_last_cleanup_time
 
 
 
578
 
579
  def _get_lock_for_key(
580
  self, namespace: str, key: str, enable_logging: bool = False
 
615
  self._release_async_lock(combined_key)
616
  _release_shared_raw_mp_lock(namespace, key)
617
 
618
+ def cleanup_expired_locks(self) -> Dict[str, Any]:
619
+ """
620
+ Cleanup expired locks for both async and multiprocess locks following the same
621
+ conditions as _release_shared_raw_mp_lock and _release_async_lock functions.
622
+
623
+ Only performs cleanup when both has_expired_locks and interval_satisfied conditions are met
624
+ to avoid too frequent cleanup operations.
625
+
626
+ Since async and multiprocess locks work together, this method cleans up
627
+ both types of expired locks and returns comprehensive statistics.
628
+
629
+ Returns:
630
+ Dict containing cleanup statistics and current status:
631
+ {
632
+ "process_id": 12345,
633
+ "cleanup_performed": {
634
+ "mp_cleaned": 5,
635
+ "async_cleaned": 3
636
+ },
637
+ "current_status": {
638
+ "total_mp_locks": 10,
639
+ "pending_mp_cleanup": 2,
640
+ "total_async_locks": 8,
641
+ "pending_async_cleanup": 1
642
+ }
643
+ }
644
+ """
645
+ global _lock_registry, _lock_registry_count, _lock_cleanup_data
646
+ global _registry_guard, _earliest_mp_cleanup_time, _last_mp_cleanup_time
647
+
648
+ cleanup_stats = {"mp_cleaned": 0, "async_cleaned": 0}
649
+
650
+ current_time = time.time()
651
+
652
+ # 1. Cleanup multiprocess locks using generic function
653
+ if (
654
+ _is_multiprocess
655
+ and _lock_registry is not None
656
+ and _registry_guard is not None
657
+ ):
658
+ try:
659
+ with _registry_guard:
660
+ if _lock_cleanup_data is not None:
661
+ # Use generic cleanup function without threshold check
662
+ cleaned_count, new_earliest_time, new_last_cleanup_time = (
663
+ _perform_lock_cleanup(
664
+ lock_type="mp",
665
+ cleanup_data=_lock_cleanup_data,
666
+ lock_registry=_lock_registry,
667
+ lock_count=_lock_registry_count,
668
+ earliest_cleanup_time=_earliest_mp_cleanup_time,
669
+ last_cleanup_time=_last_mp_cleanup_time,
670
+ current_time=current_time,
671
+ threshold_check=False, # Force cleanup in cleanup_expired_locks
672
+ )
673
+ )
674
+
675
+ # Update global state if cleanup was performed
676
+ if cleaned_count > 0:
677
+ _earliest_mp_cleanup_time = new_earliest_time
678
+ _last_mp_cleanup_time = new_last_cleanup_time
679
+ cleanup_stats["mp_cleaned"] = cleaned_count
680
+
681
+ except Exception as e:
682
+ direct_log(
683
+ f"Error during multiprocess lock cleanup: {e}",
684
+ level="ERROR",
685
+ enable_output=False,
686
+ )
687
+
688
+ # 2. Cleanup async locks using generic function
689
+ try:
690
+ # Use generic cleanup function without threshold check
691
+ cleaned_count, new_earliest_time, new_last_cleanup_time = (
692
+ _perform_lock_cleanup(
693
+ lock_type="async",
694
+ cleanup_data=self._async_lock_cleanup_data,
695
+ lock_registry=self._async_lock,
696
+ lock_count=self._async_lock_count,
697
+ earliest_cleanup_time=self._earliest_async_cleanup_time,
698
+ last_cleanup_time=self._last_async_cleanup_time,
699
+ current_time=current_time,
700
+ threshold_check=False, # Force cleanup in cleanup_expired_locks
701
+ )
702
+ )
703
+
704
+ # Update instance state if cleanup was performed
705
+ if cleaned_count > 0:
706
+ self._earliest_async_cleanup_time = new_earliest_time
707
+ self._last_async_cleanup_time = new_last_cleanup_time
708
+ cleanup_stats["async_cleaned"] = cleaned_count
709
+
710
+ except Exception as e:
711
+ direct_log(
712
+ f"Error during async lock cleanup: {e}",
713
+ level="ERROR",
714
+ enable_output=False,
715
+ )
716
+
717
+ # Log cleanup results if any locks were cleaned
718
+ total_cleaned = cleanup_stats["mp_cleaned"] + cleanup_stats["async_cleaned"]
719
+ if total_cleaned > 0:
720
+ direct_log(
721
+ f"Keyed lock cleanup completed: {total_cleaned} locks cleaned "
722
+ f"(MP: {cleanup_stats['mp_cleaned']}, Async: {cleanup_stats['async_cleaned']})",
723
+ level="INFO",
724
+ enable_output=False,
725
+ )
726
+
727
+ # 3. Get current status after cleanup
728
+ current_status = self.get_lock_status()
729
+
730
+ return {
731
+ "process_id": os.getpid(),
732
+ "cleanup_performed": cleanup_stats,
733
+ "current_status": current_status,
734
+ }
735
+
736
+ def get_lock_status(self) -> Dict[str, int]:
737
+ """
738
+ Get current status of both async and multiprocess locks.
739
+
740
+ Returns comprehensive lock counts for both types of locks since
741
+ they work together in the keyed lock system.
742
+
743
+ Returns:
744
+ Dict containing lock counts:
745
+ {
746
+ "total_mp_locks": 10,
747
+ "pending_mp_cleanup": 2,
748
+ "total_async_locks": 8,
749
+ "pending_async_cleanup": 1
750
+ }
751
+ """
752
+ global _lock_registry_count, _lock_cleanup_data, _registry_guard
753
+
754
+ status = {
755
+ "total_mp_locks": 0,
756
+ "pending_mp_cleanup": 0,
757
+ "total_async_locks": 0,
758
+ "pending_async_cleanup": 0,
759
+ }
760
+
761
+ try:
762
+ # Count multiprocess locks
763
+ if _is_multiprocess and _lock_registry_count is not None:
764
+ if _registry_guard is not None:
765
+ with _registry_guard:
766
+ status["total_mp_locks"] = len(_lock_registry_count)
767
+ if _lock_cleanup_data is not None:
768
+ status["pending_mp_cleanup"] = len(_lock_cleanup_data)
769
+
770
+ # Count async locks
771
+ status["total_async_locks"] = len(self._async_lock_count)
772
+ status["pending_async_cleanup"] = len(self._async_lock_cleanup_data)
773
+
774
+ except Exception as e:
775
+ direct_log(
776
+ f"Error getting keyed lock status: {e}",
777
+ level="ERROR",
778
+ enable_output=False,
779
+ )
780
+
781
+ return status
782
+
783
 
784
  class _KeyedLockContext:
785
  def __init__(
 
900
  )
901
 
902
 
903
+ def cleanup_keyed_lock() -> Dict[str, Any]:
904
+ """
905
+ Force cleanup of expired keyed locks and return comprehensive status information.
906
+
907
+ This function actively cleans up expired locks for both async and multiprocess locks,
908
+ then returns detailed statistics about the cleanup operation and current lock status.
909
+
910
+ Returns:
911
+ Same as cleanup_expired_locks in KeyedUnifiedLock
912
+ """
913
+ global _storage_keyed_lock
914
+
915
+ # Check if shared storage is initialized
916
+ if not _initialized or _storage_keyed_lock is None:
917
+ return {
918
+ "process_id": os.getpid(),
919
+ "cleanup_performed": {"mp_cleaned": 0, "async_cleaned": 0},
920
+ "current_status": {
921
+ "total_mp_locks": 0,
922
+ "pending_mp_cleanup": 0,
923
+ "total_async_locks": 0,
924
+ "pending_async_cleanup": 0,
925
+ },
926
+ }
927
+
928
+ return _storage_keyed_lock.cleanup_expired_locks()
929
+
930
+
931
+ def get_keyed_lock_status() -> Dict[str, Any]:
932
+ """
933
+ Get current status of keyed locks without performing cleanup.
934
+
935
+ This function provides a read-only view of the current lock counts
936
+ for both multiprocess and async locks, including pending cleanup counts.
937
+
938
+ Returns:
939
+ Same as get_lock_status in KeyedUnifiedLock
940
+ """
941
+ global _storage_keyed_lock
942
+
943
+ # Check if shared storage is initialized
944
+ if not _initialized or _storage_keyed_lock is None:
945
+ return {
946
+ "process_id": os.getpid(),
947
+ "total_mp_locks": 0,
948
+ "pending_mp_cleanup": 0,
949
+ "total_async_locks": 0,
950
+ "pending_async_cleanup": 0,
951
+ }
952
+
953
+ status = _storage_keyed_lock.get_lock_status()
954
+ status["process_id"] = os.getpid()
955
+ return status
956
+
957
+
958
  def initialize_share_data(workers: int = 1):
959
  """
960
  Initialize shared storage data for single or multi-process mode.