yangdx
commited on
Commit
·
1372c05
1
Parent(s):
66aece8
Fix linting
Browse files- lightrag/kg/faiss_impl.py +35 -30
- lightrag/kg/nano_vector_db_impl.py +11 -6
- lightrag/kg/networkx_impl.py +19 -11
- lightrag/kg/postgres_impl.py +2 -2
- lightrag/kg/shared_storage.py +36 -35
- lightrag/lightrag.py +4 -1
- run_with_gunicorn.py +31 -10
lightrag/kg/faiss_impl.py
CHANGED
@@ -50,7 +50,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
|
|
50 |
self._max_batch_size = self.global_config["embedding_batch_num"]
|
51 |
# Embedding dimension (e.g. 768) must match your embedding function
|
52 |
self._dim = self.embedding_func.embedding_dim
|
53 |
-
|
54 |
# Create an empty Faiss index for inner product (useful for normalized vectors = cosine similarity).
|
55 |
# If you have a large number of vectors, you might want IVF or other indexes.
|
56 |
# For demonstration, we use a simple IndexFlatIP.
|
@@ -73,9 +73,12 @@ class FaissVectorDBStorage(BaseVectorStorage):
|
|
73 |
# Acquire lock to prevent concurrent read and write
|
74 |
with self._storage_lock:
|
75 |
# Check if storage was updated by another process
|
76 |
-
if (is_multiprocess and self.storage_updated.value) or
|
77 |
-
|
78 |
-
|
|
|
|
|
|
|
79 |
# Reload data
|
80 |
self._index = faiss.IndexFlatIP(self._dim)
|
81 |
self._id_to_meta = {}
|
@@ -86,7 +89,6 @@ class FaissVectorDBStorage(BaseVectorStorage):
|
|
86 |
self.storage_updated = False
|
87 |
return self._index
|
88 |
|
89 |
-
|
90 |
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
91 |
"""
|
92 |
Insert or update vectors in the Faiss index.
|
@@ -337,32 +339,35 @@ class FaissVectorDBStorage(BaseVectorStorage):
|
|
337 |
self._index = faiss.IndexFlatIP(self._dim)
|
338 |
self._id_to_meta = {}
|
339 |
|
|
|
340 |
async def index_done_callback(self) -> None:
|
341 |
-
|
342 |
-
|
343 |
-
|
344 |
-
|
345 |
-
|
346 |
-
|
347 |
-
|
348 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
349 |
self.storage_updated.value = False
|
|
|
|
|
|
|
|
|
350 |
return False # Return error
|
351 |
|
352 |
-
|
353 |
-
async with self._storage_lock:
|
354 |
-
try:
|
355 |
-
# Save data to disk
|
356 |
-
self._save_faiss_index()
|
357 |
-
# Set all update flags to False
|
358 |
-
await set_all_update_flags(self.namespace)
|
359 |
-
# Reset own update flag to avoid self-reloading
|
360 |
-
if is_multiprocess:
|
361 |
-
self.storage_updated.value = False
|
362 |
-
else:
|
363 |
-
self.storage_updated = False
|
364 |
-
except Exception as e:
|
365 |
-
logger.error(f"Error saving FAISS index for {self.namespace}: {e}")
|
366 |
-
return False # Return error
|
367 |
-
|
368 |
-
return True # Return success
|
|
|
50 |
self._max_batch_size = self.global_config["embedding_batch_num"]
|
51 |
# Embedding dimension (e.g. 768) must match your embedding function
|
52 |
self._dim = self.embedding_func.embedding_dim
|
53 |
+
|
54 |
# Create an empty Faiss index for inner product (useful for normalized vectors = cosine similarity).
|
55 |
# If you have a large number of vectors, you might want IVF or other indexes.
|
56 |
# For demonstration, we use a simple IndexFlatIP.
|
|
|
73 |
# Acquire lock to prevent concurrent read and write
|
74 |
with self._storage_lock:
|
75 |
# Check if storage was updated by another process
|
76 |
+
if (is_multiprocess and self.storage_updated.value) or (
|
77 |
+
not is_multiprocess and self.storage_updated
|
78 |
+
):
|
79 |
+
logger.info(
|
80 |
+
f"Process {os.getpid()} FAISS reloading {self.namespace} due to update by another process"
|
81 |
+
)
|
82 |
# Reload data
|
83 |
self._index = faiss.IndexFlatIP(self._dim)
|
84 |
self._id_to_meta = {}
|
|
|
89 |
self.storage_updated = False
|
90 |
return self._index
|
91 |
|
|
|
92 |
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
93 |
"""
|
94 |
Insert or update vectors in the Faiss index.
|
|
|
339 |
self._index = faiss.IndexFlatIP(self._dim)
|
340 |
self._id_to_meta = {}
|
341 |
|
342 |
+
|
343 |
async def index_done_callback(self) -> None:
|
344 |
+
# Check if storage was updated by another process
|
345 |
+
if is_multiprocess and self.storage_updated.value:
|
346 |
+
# Storage was updated by another process, reload data instead of saving
|
347 |
+
logger.warning(
|
348 |
+
f"Storage for FAISS {self.namespace} was updated by another process, reloading..."
|
349 |
+
)
|
350 |
+
with self._storage_lock:
|
351 |
+
self._index = faiss.IndexFlatIP(self._dim)
|
352 |
+
self._id_to_meta = {}
|
353 |
+
self._load_faiss_index()
|
354 |
+
self.storage_updated.value = False
|
355 |
+
return False # Return error
|
356 |
+
|
357 |
+
# Acquire lock and perform persistence
|
358 |
+
async with self._storage_lock:
|
359 |
+
try:
|
360 |
+
# Save data to disk
|
361 |
+
self._save_faiss_index()
|
362 |
+
# Set all update flags to False
|
363 |
+
await set_all_update_flags(self.namespace)
|
364 |
+
# Reset own update flag to avoid self-reloading
|
365 |
+
if is_multiprocess:
|
366 |
self.storage_updated.value = False
|
367 |
+
else:
|
368 |
+
self.storage_updated = False
|
369 |
+
except Exception as e:
|
370 |
+
logger.error(f"Error saving FAISS index for {self.namespace}: {e}")
|
371 |
return False # Return error
|
372 |
|
373 |
+
return True # Return success
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
lightrag/kg/nano_vector_db_impl.py
CHANGED
@@ -64,9 +64,12 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|
64 |
# Acquire lock to prevent concurrent read and write
|
65 |
async with self._storage_lock:
|
66 |
# Check if data needs to be reloaded
|
67 |
-
if (is_multiprocess and self.storage_updated.value) or
|
68 |
-
|
69 |
-
|
|
|
|
|
|
|
70 |
# Reload data
|
71 |
self._client = NanoVectorDB(
|
72 |
self.embedding_func.embedding_dim,
|
@@ -77,7 +80,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|
77 |
self.storage_updated.value = False
|
78 |
else:
|
79 |
self.storage_updated = False
|
80 |
-
|
81 |
return self._client
|
82 |
|
83 |
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
@@ -204,7 +207,9 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|
204 |
# Check if storage was updated by another process
|
205 |
if is_multiprocess and self.storage_updated.value:
|
206 |
# Storage was updated by another process, reload data instead of saving
|
207 |
-
logger.warning(
|
|
|
|
|
208 |
self._client = NanoVectorDB(
|
209 |
self.embedding_func.embedding_dim,
|
210 |
storage_file=self._client_file_name,
|
@@ -212,7 +217,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|
212 |
# Reset update flag
|
213 |
self.storage_updated.value = False
|
214 |
return False # Return error
|
215 |
-
|
216 |
# Acquire lock and perform persistence
|
217 |
async with self._storage_lock:
|
218 |
try:
|
|
|
64 |
# Acquire lock to prevent concurrent read and write
|
65 |
async with self._storage_lock:
|
66 |
# Check if data needs to be reloaded
|
67 |
+
if (is_multiprocess and self.storage_updated.value) or (
|
68 |
+
not is_multiprocess and self.storage_updated
|
69 |
+
):
|
70 |
+
logger.info(
|
71 |
+
f"Process {os.getpid()} reloading {self.namespace} due to update by another process"
|
72 |
+
)
|
73 |
# Reload data
|
74 |
self._client = NanoVectorDB(
|
75 |
self.embedding_func.embedding_dim,
|
|
|
80 |
self.storage_updated.value = False
|
81 |
else:
|
82 |
self.storage_updated = False
|
83 |
+
|
84 |
return self._client
|
85 |
|
86 |
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
|
|
207 |
# Check if storage was updated by another process
|
208 |
if is_multiprocess and self.storage_updated.value:
|
209 |
# Storage was updated by another process, reload data instead of saving
|
210 |
+
logger.warning(
|
211 |
+
f"Storage for {self.namespace} was updated by another process, reloading..."
|
212 |
+
)
|
213 |
self._client = NanoVectorDB(
|
214 |
self.embedding_func.embedding_dim,
|
215 |
storage_file=self._client_file_name,
|
|
|
217 |
# Reset update flag
|
218 |
self.storage_updated.value = False
|
219 |
return False # Return error
|
220 |
+
|
221 |
# Acquire lock and perform persistence
|
222 |
async with self._storage_lock:
|
223 |
try:
|
lightrag/kg/networkx_impl.py
CHANGED
@@ -91,7 +91,7 @@ class NetworkXStorage(BaseGraphStorage):
|
|
91 |
else:
|
92 |
logger.info("Created new empty graph")
|
93 |
self._graph = preloaded_graph or nx.Graph()
|
94 |
-
|
95 |
self._node_embed_algorithms = {
|
96 |
"node2vec": self._node2vec_embed,
|
97 |
}
|
@@ -108,19 +108,23 @@ class NetworkXStorage(BaseGraphStorage):
|
|
108 |
# Acquire lock to prevent concurrent read and write
|
109 |
async with self._storage_lock:
|
110 |
# Check if data needs to be reloaded
|
111 |
-
if (is_multiprocess and self.storage_updated.value) or
|
112 |
-
|
113 |
-
|
|
|
|
|
|
|
114 |
# Reload data
|
115 |
-
self._graph =
|
|
|
|
|
116 |
# Reset update flag
|
117 |
if is_multiprocess:
|
118 |
self.storage_updated.value = False
|
119 |
else:
|
120 |
self.storage_updated = False
|
121 |
-
|
122 |
-
return self._graph
|
123 |
|
|
|
124 |
|
125 |
async def has_node(self, node_id: str) -> bool:
|
126 |
graph = await self._get_graph()
|
@@ -334,12 +338,16 @@ class NetworkXStorage(BaseGraphStorage):
|
|
334 |
# Check if storage was updated by another process
|
335 |
if is_multiprocess and self.storage_updated.value:
|
336 |
# Storage was updated by another process, reload data instead of saving
|
337 |
-
logger.warning(
|
338 |
-
|
|
|
|
|
|
|
|
|
339 |
# Reset update flag
|
340 |
self.storage_updated.value = False
|
341 |
return False # Return error
|
342 |
-
|
343 |
# Acquire lock and perform persistence
|
344 |
async with self._storage_lock:
|
345 |
try:
|
@@ -356,5 +364,5 @@ class NetworkXStorage(BaseGraphStorage):
|
|
356 |
except Exception as e:
|
357 |
logger.error(f"Error saving graph for {self.namespace}: {e}")
|
358 |
return False # Return error
|
359 |
-
|
360 |
return True
|
|
|
91 |
else:
|
92 |
logger.info("Created new empty graph")
|
93 |
self._graph = preloaded_graph or nx.Graph()
|
94 |
+
|
95 |
self._node_embed_algorithms = {
|
96 |
"node2vec": self._node2vec_embed,
|
97 |
}
|
|
|
108 |
# Acquire lock to prevent concurrent read and write
|
109 |
async with self._storage_lock:
|
110 |
# Check if data needs to be reloaded
|
111 |
+
if (is_multiprocess and self.storage_updated.value) or (
|
112 |
+
not is_multiprocess and self.storage_updated
|
113 |
+
):
|
114 |
+
logger.info(
|
115 |
+
f"Process {os.getpid()} reloading graph {self.namespace} due to update by another process"
|
116 |
+
)
|
117 |
# Reload data
|
118 |
+
self._graph = (
|
119 |
+
NetworkXStorage.load_nx_graph(self._graphml_xml_file) or nx.Graph()
|
120 |
+
)
|
121 |
# Reset update flag
|
122 |
if is_multiprocess:
|
123 |
self.storage_updated.value = False
|
124 |
else:
|
125 |
self.storage_updated = False
|
|
|
|
|
126 |
|
127 |
+
return self._graph
|
128 |
|
129 |
async def has_node(self, node_id: str) -> bool:
|
130 |
graph = await self._get_graph()
|
|
|
338 |
# Check if storage was updated by another process
|
339 |
if is_multiprocess and self.storage_updated.value:
|
340 |
# Storage was updated by another process, reload data instead of saving
|
341 |
+
logger.warning(
|
342 |
+
f"Graph for {self.namespace} was updated by another process, reloading..."
|
343 |
+
)
|
344 |
+
self._graph = (
|
345 |
+
NetworkXStorage.load_nx_graph(self._graphml_xml_file) or nx.Graph()
|
346 |
+
)
|
347 |
# Reset update flag
|
348 |
self.storage_updated.value = False
|
349 |
return False # Return error
|
350 |
+
|
351 |
# Acquire lock and perform persistence
|
352 |
async with self._storage_lock:
|
353 |
try:
|
|
|
364 |
except Exception as e:
|
365 |
logger.error(f"Error saving graph for {self.namespace}: {e}")
|
366 |
return False # Return error
|
367 |
+
|
368 |
return True
|
lightrag/kg/postgres_impl.py
CHANGED
@@ -38,8 +38,8 @@ import pipmaster as pm
|
|
38 |
if not pm.is_installed("asyncpg"):
|
39 |
pm.install("asyncpg")
|
40 |
|
41 |
-
import asyncpg
|
42 |
-
from asyncpg import Pool
|
43 |
|
44 |
|
45 |
class PostgreSQLDB:
|
|
|
38 |
if not pm.is_installed("asyncpg"):
|
39 |
pm.install("asyncpg")
|
40 |
|
41 |
+
import asyncpg # type: ignore
|
42 |
+
from asyncpg import Pool # type: ignore
|
43 |
|
44 |
|
45 |
class PostgreSQLDB:
|
lightrag/kg/shared_storage.py
CHANGED
@@ -15,7 +15,7 @@ def direct_log(message, level="INFO"):
|
|
15 |
print(f"{level}: {message}", file=sys.stderr, flush=True)
|
16 |
|
17 |
|
18 |
-
T = TypeVar(
|
19 |
LockType = Union[ProcessLock, asyncio.Lock]
|
20 |
|
21 |
is_multiprocess = None
|
@@ -26,20 +26,22 @@ _initialized = None
|
|
26 |
# shared data for storage across processes
|
27 |
_shared_dicts: Optional[Dict[str, Any]] = None
|
28 |
_init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
|
29 |
-
_update_flags: Optional[Dict[str, bool]] = None
|
30 |
|
31 |
# locks for mutex access
|
32 |
_storage_lock: Optional[LockType] = None
|
33 |
_internal_lock: Optional[LockType] = None
|
34 |
_pipeline_status_lock: Optional[LockType] = None
|
35 |
|
|
|
36 |
class UnifiedLock(Generic[T]):
|
37 |
"""Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
|
|
|
38 |
def __init__(self, lock: Union[ProcessLock, asyncio.Lock], is_async: bool):
|
39 |
self._lock = lock
|
40 |
self._is_async = is_async
|
41 |
|
42 |
-
async def __aenter__(self) ->
|
43 |
if self._is_async:
|
44 |
await self._lock.acquire()
|
45 |
else:
|
@@ -52,7 +54,7 @@ class UnifiedLock(Generic[T]):
|
|
52 |
else:
|
53 |
self._lock.release()
|
54 |
|
55 |
-
def __enter__(self) ->
|
56 |
"""For backward compatibility"""
|
57 |
if self._is_async:
|
58 |
raise RuntimeError("Use 'async with' for shared_storage lock")
|
@@ -68,24 +70,18 @@ class UnifiedLock(Generic[T]):
|
|
68 |
|
69 |
def get_internal_lock() -> UnifiedLock:
|
70 |
"""return unified storage lock for data consistency"""
|
71 |
-
return UnifiedLock(
|
72 |
-
|
73 |
-
is_async=not is_multiprocess
|
74 |
-
)
|
75 |
|
76 |
def get_storage_lock() -> UnifiedLock:
|
77 |
"""return unified storage lock for data consistency"""
|
78 |
-
return UnifiedLock(
|
79 |
-
|
80 |
-
is_async=not is_multiprocess
|
81 |
-
)
|
82 |
|
83 |
def get_pipeline_status_lock() -> UnifiedLock:
|
84 |
"""return unified storage lock for data consistency"""
|
85 |
-
return UnifiedLock(
|
86 |
-
|
87 |
-
is_async=not is_multiprocess
|
88 |
-
)
|
89 |
|
90 |
def initialize_share_data(workers: int = 1):
|
91 |
"""
|
@@ -166,17 +162,19 @@ async def initialize_pipeline_status():
|
|
166 |
|
167 |
# Create a shared list object for history_messages
|
168 |
history_messages = _manager.list() if is_multiprocess else []
|
169 |
-
pipeline_namespace.update(
|
170 |
-
|
171 |
-
|
172 |
-
|
173 |
-
|
174 |
-
|
175 |
-
|
176 |
-
|
177 |
-
|
178 |
-
|
179 |
-
|
|
|
|
|
180 |
direct_log(f"Process {os.getpid()} Pipeline namespace initialized")
|
181 |
|
182 |
|
@@ -195,22 +193,25 @@ async def get_update_flag(namespace: str):
|
|
195 |
_update_flags[namespace] = _manager.list()
|
196 |
else:
|
197 |
_update_flags[namespace] = []
|
198 |
-
direct_log(
|
199 |
-
|
|
|
|
|
200 |
if is_multiprocess and _manager is not None:
|
201 |
-
new_update_flag = _manager.Value(
|
202 |
else:
|
203 |
new_update_flag = False
|
204 |
-
|
205 |
_update_flags[namespace].append(new_update_flag)
|
206 |
return new_update_flag
|
207 |
|
|
|
208 |
async def set_all_update_flags(namespace: str):
|
209 |
"""Set all update flag of namespace indicating all workers need to reload data from files"""
|
210 |
global _update_flags
|
211 |
if _update_flags is None:
|
212 |
raise ValueError("Try to create namespace before Shared-Data is initialized")
|
213 |
-
|
214 |
async with get_internal_lock():
|
215 |
if namespace not in _update_flags:
|
216 |
raise ValueError(f"Namespace {namespace} not found in update flags")
|
@@ -225,13 +226,13 @@ async def set_all_update_flags(namespace: str):
|
|
225 |
async def get_all_update_flags_status() -> Dict[str, list]:
|
226 |
"""
|
227 |
Get update flags status for all namespaces.
|
228 |
-
|
229 |
Returns:
|
230 |
Dict[str, list]: A dictionary mapping namespace names to lists of update flag statuses
|
231 |
"""
|
232 |
if _update_flags is None:
|
233 |
return {}
|
234 |
-
|
235 |
result = {}
|
236 |
async with get_internal_lock():
|
237 |
for namespace, flags in _update_flags.items():
|
@@ -242,7 +243,7 @@ async def get_all_update_flags_status() -> Dict[str, list]:
|
|
242 |
else:
|
243 |
worker_statuses.append(flag)
|
244 |
result[namespace] = worker_statuses
|
245 |
-
|
246 |
return result
|
247 |
|
248 |
|
|
|
15 |
print(f"{level}: {message}", file=sys.stderr, flush=True)
|
16 |
|
17 |
|
18 |
+
T = TypeVar("T")
|
19 |
LockType = Union[ProcessLock, asyncio.Lock]
|
20 |
|
21 |
is_multiprocess = None
|
|
|
26 |
# shared data for storage across processes
|
27 |
_shared_dicts: Optional[Dict[str, Any]] = None
|
28 |
_init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
|
29 |
+
_update_flags: Optional[Dict[str, bool]] = None # namespace -> updated
|
30 |
|
31 |
# locks for mutex access
|
32 |
_storage_lock: Optional[LockType] = None
|
33 |
_internal_lock: Optional[LockType] = None
|
34 |
_pipeline_status_lock: Optional[LockType] = None
|
35 |
|
36 |
+
|
37 |
class UnifiedLock(Generic[T]):
|
38 |
"""Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
|
39 |
+
|
40 |
def __init__(self, lock: Union[ProcessLock, asyncio.Lock], is_async: bool):
|
41 |
self._lock = lock
|
42 |
self._is_async = is_async
|
43 |
|
44 |
+
async def __aenter__(self) -> "UnifiedLock[T]":
|
45 |
if self._is_async:
|
46 |
await self._lock.acquire()
|
47 |
else:
|
|
|
54 |
else:
|
55 |
self._lock.release()
|
56 |
|
57 |
+
def __enter__(self) -> "UnifiedLock[T]":
|
58 |
"""For backward compatibility"""
|
59 |
if self._is_async:
|
60 |
raise RuntimeError("Use 'async with' for shared_storage lock")
|
|
|
70 |
|
71 |
def get_internal_lock() -> UnifiedLock:
|
72 |
"""return unified storage lock for data consistency"""
|
73 |
+
return UnifiedLock(lock=_internal_lock, is_async=not is_multiprocess)
|
74 |
+
|
|
|
|
|
75 |
|
76 |
def get_storage_lock() -> UnifiedLock:
|
77 |
"""return unified storage lock for data consistency"""
|
78 |
+
return UnifiedLock(lock=_storage_lock, is_async=not is_multiprocess)
|
79 |
+
|
|
|
|
|
80 |
|
81 |
def get_pipeline_status_lock() -> UnifiedLock:
|
82 |
"""return unified storage lock for data consistency"""
|
83 |
+
return UnifiedLock(lock=_pipeline_status_lock, is_async=not is_multiprocess)
|
84 |
+
|
|
|
|
|
85 |
|
86 |
def initialize_share_data(workers: int = 1):
|
87 |
"""
|
|
|
162 |
|
163 |
# Create a shared list object for history_messages
|
164 |
history_messages = _manager.list() if is_multiprocess else []
|
165 |
+
pipeline_namespace.update(
|
166 |
+
{
|
167 |
+
"busy": False, # Control concurrent processes
|
168 |
+
"job_name": "Default Job", # Current job name (indexing files/indexing texts)
|
169 |
+
"job_start": None, # Job start time
|
170 |
+
"docs": 0, # Total number of documents to be indexed
|
171 |
+
"batchs": 0, # Number of batches for processing documents
|
172 |
+
"cur_batch": 0, # Current processing batch
|
173 |
+
"request_pending": False, # Flag for pending request for processing
|
174 |
+
"latest_message": "", # Latest message from pipeline processing
|
175 |
+
"history_messages": history_messages, # 使用共享列表对象
|
176 |
+
}
|
177 |
+
)
|
178 |
direct_log(f"Process {os.getpid()} Pipeline namespace initialized")
|
179 |
|
180 |
|
|
|
193 |
_update_flags[namespace] = _manager.list()
|
194 |
else:
|
195 |
_update_flags[namespace] = []
|
196 |
+
direct_log(
|
197 |
+
f"Process {os.getpid()} initialized updated flags for namespace: [{namespace}]"
|
198 |
+
)
|
199 |
+
|
200 |
if is_multiprocess and _manager is not None:
|
201 |
+
new_update_flag = _manager.Value("b", False)
|
202 |
else:
|
203 |
new_update_flag = False
|
204 |
+
|
205 |
_update_flags[namespace].append(new_update_flag)
|
206 |
return new_update_flag
|
207 |
|
208 |
+
|
209 |
async def set_all_update_flags(namespace: str):
|
210 |
"""Set all update flag of namespace indicating all workers need to reload data from files"""
|
211 |
global _update_flags
|
212 |
if _update_flags is None:
|
213 |
raise ValueError("Try to create namespace before Shared-Data is initialized")
|
214 |
+
|
215 |
async with get_internal_lock():
|
216 |
if namespace not in _update_flags:
|
217 |
raise ValueError(f"Namespace {namespace} not found in update flags")
|
|
|
226 |
async def get_all_update_flags_status() -> Dict[str, list]:
|
227 |
"""
|
228 |
Get update flags status for all namespaces.
|
229 |
+
|
230 |
Returns:
|
231 |
Dict[str, list]: A dictionary mapping namespace names to lists of update flag statuses
|
232 |
"""
|
233 |
if _update_flags is None:
|
234 |
return {}
|
235 |
+
|
236 |
result = {}
|
237 |
async with get_internal_lock():
|
238 |
for namespace, flags in _update_flags.items():
|
|
|
243 |
else:
|
244 |
worker_statuses.append(flag)
|
245 |
result[namespace] = worker_statuses
|
246 |
+
|
247 |
return result
|
248 |
|
249 |
|
lightrag/lightrag.py
CHANGED
@@ -696,7 +696,10 @@ class LightRAG:
|
|
696 |
3. Process each chunk for entity and relation extraction
|
697 |
4. Update the document status
|
698 |
"""
|
699 |
-
from lightrag.kg.shared_storage import
|
|
|
|
|
|
|
700 |
|
701 |
# Get pipeline status shared data and lock
|
702 |
pipeline_status = await get_namespace_data("pipeline_status")
|
|
|
696 |
3. Process each chunk for entity and relation extraction
|
697 |
4. Update the document status
|
698 |
"""
|
699 |
+
from lightrag.kg.shared_storage import (
|
700 |
+
get_namespace_data,
|
701 |
+
get_pipeline_status_lock,
|
702 |
+
)
|
703 |
|
704 |
# Get pipeline status shared data and lock
|
705 |
pipeline_status = await get_namespace_data("pipeline_status")
|
run_with_gunicorn.py
CHANGED
@@ -47,10 +47,11 @@ def main():
|
|
47 |
|
48 |
# Check and install gunicorn if not present
|
49 |
import pipmaster as pm
|
|
|
50 |
if not pm.is_installed("gunicorn"):
|
51 |
print("Installing gunicorn...")
|
52 |
pm.install("gunicorn")
|
53 |
-
|
54 |
# Import Gunicorn's StandaloneApplication
|
55 |
from gunicorn.app.base import BaseApplication
|
56 |
|
@@ -103,26 +104,46 @@ def main():
|
|
103 |
import gunicorn_config
|
104 |
|
105 |
# Set configuration variables in gunicorn_config, prioritizing command line arguments
|
106 |
-
gunicorn_config.workers =
|
107 |
-
|
|
|
|
|
108 |
# Bind configuration prioritizes command line arguments
|
109 |
host = args.host if args.host != "0.0.0.0" else os.getenv("HOST", "0.0.0.0")
|
110 |
port = args.port if args.port != 9621 else int(os.getenv("PORT", 9621))
|
111 |
gunicorn_config.bind = f"{host}:{port}"
|
112 |
-
|
113 |
# Log level configuration prioritizes command line arguments
|
114 |
-
gunicorn_config.loglevel =
|
|
|
|
|
|
|
|
|
115 |
|
116 |
# Timeout configuration prioritizes command line arguments
|
117 |
-
gunicorn_config.timeout =
|
118 |
-
|
|
|
|
|
119 |
# Keepalive configuration
|
120 |
gunicorn_config.keepalive = int(os.getenv("KEEPALIVE", 5))
|
121 |
|
122 |
# SSL configuration prioritizes command line arguments
|
123 |
-
if args.ssl or os.getenv("SSL", "").lower() in (
|
124 |
-
|
125 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
126 |
|
127 |
# Set configuration options from the module
|
128 |
for key in dir(gunicorn_config):
|
|
|
47 |
|
48 |
# Check and install gunicorn if not present
|
49 |
import pipmaster as pm
|
50 |
+
|
51 |
if not pm.is_installed("gunicorn"):
|
52 |
print("Installing gunicorn...")
|
53 |
pm.install("gunicorn")
|
54 |
+
|
55 |
# Import Gunicorn's StandaloneApplication
|
56 |
from gunicorn.app.base import BaseApplication
|
57 |
|
|
|
104 |
import gunicorn_config
|
105 |
|
106 |
# Set configuration variables in gunicorn_config, prioritizing command line arguments
|
107 |
+
gunicorn_config.workers = (
|
108 |
+
args.workers if args.workers else int(os.getenv("WORKERS", 1))
|
109 |
+
)
|
110 |
+
|
111 |
# Bind configuration prioritizes command line arguments
|
112 |
host = args.host if args.host != "0.0.0.0" else os.getenv("HOST", "0.0.0.0")
|
113 |
port = args.port if args.port != 9621 else int(os.getenv("PORT", 9621))
|
114 |
gunicorn_config.bind = f"{host}:{port}"
|
115 |
+
|
116 |
# Log level configuration prioritizes command line arguments
|
117 |
+
gunicorn_config.loglevel = (
|
118 |
+
args.log_level.lower()
|
119 |
+
if args.log_level
|
120 |
+
else os.getenv("LOG_LEVEL", "info")
|
121 |
+
)
|
122 |
|
123 |
# Timeout configuration prioritizes command line arguments
|
124 |
+
gunicorn_config.timeout = (
|
125 |
+
args.timeout if args.timeout else int(os.getenv("TIMEOUT", 150))
|
126 |
+
)
|
127 |
+
|
128 |
# Keepalive configuration
|
129 |
gunicorn_config.keepalive = int(os.getenv("KEEPALIVE", 5))
|
130 |
|
131 |
# SSL configuration prioritizes command line arguments
|
132 |
+
if args.ssl or os.getenv("SSL", "").lower() in (
|
133 |
+
"true",
|
134 |
+
"1",
|
135 |
+
"yes",
|
136 |
+
"t",
|
137 |
+
"on",
|
138 |
+
):
|
139 |
+
gunicorn_config.certfile = (
|
140 |
+
args.ssl_certfile
|
141 |
+
if args.ssl_certfile
|
142 |
+
else os.getenv("SSL_CERTFILE")
|
143 |
+
)
|
144 |
+
gunicorn_config.keyfile = (
|
145 |
+
args.ssl_keyfile if args.ssl_keyfile else os.getenv("SSL_KEYFILE")
|
146 |
+
)
|
147 |
|
148 |
# Set configuration options from the module
|
149 |
for key in dir(gunicorn_config):
|