Merge pull request #1619 from earayu/add_doc_for_parall
Browse filesAdd doc for explaining LightRAG's multi-document concurrent processing mechanism
docs/LightRAG_concurrent_explain.md
ADDED
@@ -0,0 +1,281 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
# LightRAG Multi-Document Processing: Concurrent Control Strategy Analysis
|
2 |
+
|
3 |
+
LightRAG employs a multi-layered concurrent control strategy when processing multiple documents. This article provides an in-depth analysis of the concurrent control mechanisms at document level, chunk level, and LLM request level, helping you understand why specific concurrent behaviors occur.
|
4 |
+
|
5 |
+
## Overview
|
6 |
+
|
7 |
+
LightRAG's concurrent control is divided into three layers:
|
8 |
+
|
9 |
+
1. **Document-level concurrency**: Controls the number of documents processed simultaneously
|
10 |
+
2. **Chunk-level concurrency**: Controls the number of chunks processed simultaneously within a single document
|
11 |
+
3. **LLM request-level concurrency**: Controls the global concurrent number of LLM requests
|
12 |
+
|
13 |
+
## 1. Document-Level Concurrent Control
|
14 |
+
|
15 |
+
**Control Parameter**: `max_parallel_insert`
|
16 |
+
|
17 |
+
Document-level concurrency is controlled by the `max_parallel_insert` parameter, with a default value of 2.
|
18 |
+
|
19 |
+
```python
|
20 |
+
# lightrag/lightrag.py
|
21 |
+
max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
|
22 |
+
```
|
23 |
+
|
24 |
+
### Implementation Mechanism
|
25 |
+
|
26 |
+
In the `apipeline_process_enqueue_documents` method, a semaphore is used to control document concurrency:
|
27 |
+
|
28 |
+
```python
|
29 |
+
# lightrag/lightrag.py - apipeline_process_enqueue_documents method
|
30 |
+
async def process_document(
|
31 |
+
doc_id: str,
|
32 |
+
status_doc: DocProcessingStatus,
|
33 |
+
split_by_character: str | None,
|
34 |
+
split_by_character_only: bool,
|
35 |
+
pipeline_status: dict,
|
36 |
+
pipeline_status_lock: asyncio.Lock,
|
37 |
+
semaphore: asyncio.Semaphore, # Document-level semaphore
|
38 |
+
) -> None:
|
39 |
+
"""Process single document"""
|
40 |
+
async with semaphore: # 🔥 Document-level concurrent control
|
41 |
+
# ... Process all chunks of a single document
|
42 |
+
|
43 |
+
# Create document-level semaphore
|
44 |
+
semaphore = asyncio.Semaphore(self.max_parallel_insert) # Default 2
|
45 |
+
|
46 |
+
# Create processing tasks for each document
|
47 |
+
doc_tasks = []
|
48 |
+
for doc_id, status_doc in to_process_docs.items():
|
49 |
+
doc_tasks.append(
|
50 |
+
process_document(
|
51 |
+
doc_id, status_doc, split_by_character, split_by_character_only,
|
52 |
+
pipeline_status, pipeline_status_lock, semaphore
|
53 |
+
)
|
54 |
+
)
|
55 |
+
|
56 |
+
# Wait for all documents to complete processing
|
57 |
+
await asyncio.gather(*doc_tasks)
|
58 |
+
```
|
59 |
+
|
60 |
+
## 2. Chunk-Level Concurrent Control
|
61 |
+
|
62 |
+
**Control Parameter**: `llm_model_max_async`
|
63 |
+
|
64 |
+
**Key Point**: Each document independently creates its own chunk semaphore!
|
65 |
+
|
66 |
+
```python
|
67 |
+
# lightrag/lightrag.py
|
68 |
+
llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
|
69 |
+
```
|
70 |
+
|
71 |
+
### Implementation Mechanism
|
72 |
+
|
73 |
+
In the `extract_entities` function, **each document independently creates** its own chunk semaphore:
|
74 |
+
|
75 |
+
```python
|
76 |
+
# lightrag/operate.py - extract_entities function
|
77 |
+
async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...):
|
78 |
+
# 🔥 Key: Each document independently creates this semaphore!
|
79 |
+
llm_model_max_async = global_config.get("llm_model_max_async", 4)
|
80 |
+
semaphore = asyncio.Semaphore(llm_model_max_async) # Chunk semaphore for each document
|
81 |
+
|
82 |
+
async def _process_with_semaphore(chunk):
|
83 |
+
async with semaphore: # 🔥 Chunk concurrent control within document
|
84 |
+
return await _process_single_content(chunk)
|
85 |
+
|
86 |
+
# Create tasks for each chunk
|
87 |
+
tasks = []
|
88 |
+
for c in ordered_chunks:
|
89 |
+
task = asyncio.create_task(_process_with_semaphore(c))
|
90 |
+
tasks.append(task)
|
91 |
+
|
92 |
+
# Wait for all chunks to complete processing
|
93 |
+
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
|
94 |
+
chunk_results = [task.result() for task in tasks]
|
95 |
+
return chunk_results
|
96 |
+
```
|
97 |
+
|
98 |
+
### Important Inference: System Overall Chunk Concurrency
|
99 |
+
|
100 |
+
Since each document independently creates chunk semaphores, the theoretical chunk concurrency of the system is:
|
101 |
+
|
102 |
+
**Theoretical Chunk Concurrency = max_parallel_insert × llm_model_max_async**
|
103 |
+
|
104 |
+
For example:
|
105 |
+
- `max_parallel_insert = 2` (process 2 documents simultaneously)
|
106 |
+
- `llm_model_max_async = 4` (maximum 4 chunk concurrency per document)
|
107 |
+
- **Theoretical result**: Maximum 2 × 4 = 8 chunks simultaneously in "processing" state
|
108 |
+
|
109 |
+
## 3. LLM Request-Level Concurrent Control (The Real Bottleneck)
|
110 |
+
|
111 |
+
**Control Parameter**: `llm_model_max_async` (globally shared)
|
112 |
+
|
113 |
+
**Key**: Although there might be 8 chunks "in processing", all LLM requests share the same global priority queue!
|
114 |
+
|
115 |
+
```python
|
116 |
+
# lightrag/lightrag.py - __post_init__ method
|
117 |
+
self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)(
|
118 |
+
partial(
|
119 |
+
self.llm_model_func,
|
120 |
+
hashing_kv=hashing_kv,
|
121 |
+
**self.llm_model_kwargs,
|
122 |
+
)
|
123 |
+
)
|
124 |
+
# 🔥 Global LLM queue size = llm_model_max_async = 4
|
125 |
+
```
|
126 |
+
|
127 |
+
### Priority Queue Implementation
|
128 |
+
|
129 |
+
```python
|
130 |
+
# lightrag/utils.py - priority_limit_async_func_call function
|
131 |
+
def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
132 |
+
def final_decro(func):
|
133 |
+
queue = asyncio.PriorityQueue(maxsize=max_queue_size)
|
134 |
+
tasks = set()
|
135 |
+
|
136 |
+
async def worker():
|
137 |
+
"""Worker that processes tasks in the priority queue"""
|
138 |
+
while not shutdown_event.is_set():
|
139 |
+
try:
|
140 |
+
priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0)
|
141 |
+
result = await func(*args, **kwargs) # 🔥 Actual LLM call
|
142 |
+
if not future.done():
|
143 |
+
future.set_result(result)
|
144 |
+
except Exception as e:
|
145 |
+
# Error handling...
|
146 |
+
finally:
|
147 |
+
queue.task_done()
|
148 |
+
|
149 |
+
# 🔥 Create fixed number of workers (max_size), this is the real concurrency limit
|
150 |
+
for _ in range(max_size):
|
151 |
+
task = asyncio.create_task(worker())
|
152 |
+
tasks.add(task)
|
153 |
+
```
|
154 |
+
|
155 |
+
## 4. Chunk Internal Processing Mechanism (Serial)
|
156 |
+
|
157 |
+
### Why Serial?
|
158 |
+
|
159 |
+
Internal processing of each chunk strictly follows this serial execution order:
|
160 |
+
|
161 |
+
```python
|
162 |
+
# lightrag/operate.py - _process_single_content function
|
163 |
+
async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
|
164 |
+
# Step 1: Initial entity extraction
|
165 |
+
hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content})
|
166 |
+
final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...)
|
167 |
+
|
168 |
+
# Process initial extraction results
|
169 |
+
maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path)
|
170 |
+
|
171 |
+
# Step 2: Gleaning phase
|
172 |
+
for now_glean_index in range(entity_extract_max_gleaning):
|
173 |
+
# 🔥 Serial wait for gleaning results
|
174 |
+
glean_result = await use_llm_func_with_cache(
|
175 |
+
continue_prompt, use_llm_func,
|
176 |
+
llm_response_cache=llm_response_cache,
|
177 |
+
history_messages=history, cache_type="extract"
|
178 |
+
)
|
179 |
+
|
180 |
+
# Process gleaning results
|
181 |
+
glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path)
|
182 |
+
|
183 |
+
# Merge results...
|
184 |
+
|
185 |
+
# Step 3: Determine whether to continue loop
|
186 |
+
if now_glean_index == entity_extract_max_gleaning - 1:
|
187 |
+
break
|
188 |
+
|
189 |
+
# 🔥 Serial wait for loop decision results
|
190 |
+
if_loop_result = await use_llm_func_with_cache(
|
191 |
+
if_loop_prompt, use_llm_func,
|
192 |
+
llm_response_cache=llm_response_cache,
|
193 |
+
history_messages=history, cache_type="extract"
|
194 |
+
)
|
195 |
+
|
196 |
+
if if_loop_result.strip().strip('"').strip("'").lower() != "yes":
|
197 |
+
break
|
198 |
+
|
199 |
+
return maybe_nodes, maybe_edges
|
200 |
+
```
|
201 |
+
|
202 |
+
## 5. Complete Concurrent Hierarchy Diagram
|
203 |
+

|
204 |
+
|
205 |
+
### Chunk Internal Processing (Serial)
|
206 |
+
```
|
207 |
+
Initial Extraction → Gleaning → Loop Decision → Complete
|
208 |
+
```
|
209 |
+
|
210 |
+
## 6. Real-World Scenario Analysis
|
211 |
+
|
212 |
+
### Scenario 1: Single Document with Multiple Chunks
|
213 |
+
Assume 1 document with 6 chunks:
|
214 |
+
|
215 |
+
- **Document level**: Only 1 document, not limited by `max_parallel_insert`
|
216 |
+
- **Chunk level**: Maximum 4 chunks processed simultaneously (limited by `llm_model_max_async=4`)
|
217 |
+
- **LLM level**: Global maximum 4 LLM requests concurrent
|
218 |
+
|
219 |
+
**Expected behavior**: 4 chunks process concurrently, remaining 2 chunks wait.
|
220 |
+
|
221 |
+
### Scenario 2: Multiple Documents with Multiple Chunks
|
222 |
+
Assume 3 documents, each with 10 chunks:
|
223 |
+
|
224 |
+
- **Document level**: Maximum 2 documents processed simultaneously
|
225 |
+
- **Chunk level**: Maximum 4 chunks per document processed simultaneously
|
226 |
+
- **Theoretical Chunk concurrency**: 2 × 4 = 8 chunks processed simultaneously
|
227 |
+
- **Actual LLM concurrency**: Only 4 LLM requests actually execute
|
228 |
+
|
229 |
+
**Actual state distribution**:
|
230 |
+
```
|
231 |
+
# Possible system state:
|
232 |
+
Document 1: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response)
|
233 |
+
Document 2: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response)
|
234 |
+
Document 3: Waiting for document-level semaphore
|
235 |
+
|
236 |
+
Total:
|
237 |
+
- 8 chunks in "processing" state
|
238 |
+
- 4 LLM requests actually executing
|
239 |
+
- 4 chunks waiting for LLM response
|
240 |
+
```
|
241 |
+
|
242 |
+
## 7. Performance Optimization Recommendations
|
243 |
+
|
244 |
+
### Understanding the Bottleneck
|
245 |
+
|
246 |
+
The real bottleneck is the global LLM queue, not the chunk semaphores!
|
247 |
+
|
248 |
+
### Adjustment Strategies
|
249 |
+
|
250 |
+
**Strategy 1: Increase LLM Concurrent Capacity**
|
251 |
+
|
252 |
+
```bash
|
253 |
+
# Environment variable configuration
|
254 |
+
export MAX_PARALLEL_INSERT=2 # Keep document concurrency
|
255 |
+
export MAX_ASYNC=8 # 🔥 Increase LLM request concurrency
|
256 |
+
```
|
257 |
+
|
258 |
+
**Strategy 2: Balance Document and LLM Concurrency**
|
259 |
+
|
260 |
+
```python
|
261 |
+
rag = LightRAG(
|
262 |
+
max_parallel_insert=3, # Moderately increase document concurrency
|
263 |
+
llm_model_max_async=12, # Significantly increase LLM concurrency
|
264 |
+
entity_extract_max_gleaning=0, # Reduce serial steps within chunks
|
265 |
+
)
|
266 |
+
```
|
267 |
+
|
268 |
+
## 8. Summary
|
269 |
+
|
270 |
+
Key characteristics of LightRAG's multi-document concurrent processing mechanism:
|
271 |
+
|
272 |
+
### Concurrent Layers
|
273 |
+
1. **Inter-document competition**: Controlled by `max_parallel_insert`, default 2 documents concurrent
|
274 |
+
2. **Theoretical Chunk concurrency**: Each document independently creates semaphores, total = max_parallel_insert × llm_model_max_async
|
275 |
+
3. **Actual LLM concurrency**: All chunks share global LLM queue, controlled by `llm_model_max_async`
|
276 |
+
4. **Intra-chunk serial**: Multiple LLM requests within each chunk execute strictly serially
|
277 |
+
|
278 |
+
### Key Insights
|
279 |
+
- **Theoretical vs Actual**: System may have many chunks "in processing", but only few are actually executing LLM requests
|
280 |
+
- **Real Bottleneck**: Global LLM request queue is the performance bottleneck, not chunk semaphores
|
281 |
+
- **Optimization Focus**: Increasing `llm_model_max_async` is more effective than increasing `max_parallel_insert`
|
docs/assets/lightrag_indexing.png
ADDED
![]() |
Git LFS Details
|
docs/zh/LightRAG_concurrent_explain_zh.md
ADDED
@@ -0,0 +1,277 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
# LightRAG 多文档并发控制机制详解
|
2 |
+
|
3 |
+
LightRAG 在处理多个文档时采用了多层次的并发控制策略。本文将深入分析文档级别、chunk级别和LLM请求级别的并发控制机制,帮助您理解为什么会出现特定的并发行为。
|
4 |
+
|
5 |
+
## 概述
|
6 |
+
|
7 |
+
LightRAG 的并发控制分为三个层次:
|
8 |
+
|
9 |
+
1. 文档级别并发:控制同时处理的文档数量
|
10 |
+
2. Chunk级别并发:控制单个文档内同时处理的chunk数量
|
11 |
+
3. LLM请求级别并发:控制全局LLM请求的并发数量
|
12 |
+
|
13 |
+
## 1. 文档级别并发控制
|
14 |
+
|
15 |
+
**控制参数**:`max_parallel_insert`
|
16 |
+
|
17 |
+
文档级别的并发由 `max_parallel_insert` 参数控制,默认值为2。
|
18 |
+
|
19 |
+
```python
|
20 |
+
# lightrag/lightrag.py
|
21 |
+
max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
|
22 |
+
```
|
23 |
+
|
24 |
+
### 实现机制
|
25 |
+
|
26 |
+
在 `apipeline_process_enqueue_documents` 方法中,使用信号量控制文档并发:
|
27 |
+
|
28 |
+
```python
|
29 |
+
# lightrag/lightrag.py - apipeline_process_enqueue_documents方法
|
30 |
+
async def process_document(
|
31 |
+
doc_id: str,
|
32 |
+
status_doc: DocProcessingStatus,
|
33 |
+
split_by_character: str | None,
|
34 |
+
split_by_character_only: bool,
|
35 |
+
pipeline_status: dict,
|
36 |
+
pipeline_status_lock: asyncio.Lock,
|
37 |
+
semaphore: asyncio.Semaphore, # 文档级别信号量
|
38 |
+
) -> None:
|
39 |
+
"""Process single document"""
|
40 |
+
async with semaphore: # 🔥 文档级别并发控制
|
41 |
+
# ... 处理单个文档的所有chunks
|
42 |
+
|
43 |
+
# 创建文档级别信号量
|
44 |
+
semaphore = asyncio.Semaphore(self.max_parallel_insert) # 默认2
|
45 |
+
|
46 |
+
# 为每个文档创建处理任务
|
47 |
+
doc_tasks = []
|
48 |
+
for doc_id, status_doc in to_process_docs.items():
|
49 |
+
doc_tasks.append(
|
50 |
+
process_document(
|
51 |
+
doc_id, status_doc, split_by_character, split_by_character_only,
|
52 |
+
pipeline_status, pipeline_status_lock, semaphore
|
53 |
+
)
|
54 |
+
)
|
55 |
+
|
56 |
+
# 等待所有文档处理完成
|
57 |
+
await asyncio.gather(*doc_tasks)
|
58 |
+
```
|
59 |
+
|
60 |
+
## 2. Chunk级别并发控制
|
61 |
+
|
62 |
+
**控制参数**:`llm_model_max_async`
|
63 |
+
|
64 |
+
**关键点**:每个文档都会独立创建自己的chunk信号量!
|
65 |
+
|
66 |
+
```python
|
67 |
+
# lightrag/lightrag.py
|
68 |
+
llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
|
69 |
+
```
|
70 |
+
|
71 |
+
### 实现机制
|
72 |
+
|
73 |
+
在 `extract_entities` 函数中,**每个文档独立创建**自己的chunk信号量:
|
74 |
+
|
75 |
+
```python
|
76 |
+
# lightrag/operate.py - extract_entities函数
|
77 |
+
async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...):
|
78 |
+
# 🔥 关键:每个文档都会独立创建这个信号量!
|
79 |
+
llm_model_max_async = global_config.get("llm_model_max_async", 4)
|
80 |
+
semaphore = asyncio.Semaphore(llm_model_max_async) # 每个文档的chunk信号量
|
81 |
+
|
82 |
+
async def _process_with_semaphore(chunk):
|
83 |
+
async with semaphore: # 🔥 文档内部的chunk并发控制
|
84 |
+
return await _process_single_content(chunk)
|
85 |
+
|
86 |
+
# 为每个chunk创建任务
|
87 |
+
tasks = []
|
88 |
+
for c in ordered_chunks:
|
89 |
+
task = asyncio.create_task(_process_with_semaphore(c))
|
90 |
+
tasks.append(task)
|
91 |
+
|
92 |
+
# 等待所有chunk处理完成
|
93 |
+
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
|
94 |
+
chunk_results = [task.result() for task in tasks]
|
95 |
+
return chunk_results
|
96 |
+
```
|
97 |
+
|
98 |
+
### 重要推论:系统整体Chunk并发数
|
99 |
+
|
100 |
+
由于每个文档独立创建chunk信号量,系统理论上的chunk并发数是:
|
101 |
+
|
102 |
+
**理论Chunk并发数 = max_parallel_insert × llm_model_max_async**
|
103 |
+
|
104 |
+
例如:
|
105 |
+
- `max_parallel_insert = 2`(同时处理2个文档)
|
106 |
+
- `llm_model_max_async = 4`(每个文档最多4个chunk并发)
|
107 |
+
- 理论结果:最多 2 × 4 = 8个chunk同时处于"处理中"状态
|
108 |
+
|
109 |
+
## 3. LLM请求级别并发控制(真正的瓶颈)
|
110 |
+
|
111 |
+
**控制参数**:`llm_model_max_async`(全局共享)
|
112 |
+
|
113 |
+
**关键**:尽管可能有8个chunk在"处理中",但所有LLM请求共享同一个全局优先级队列!
|
114 |
+
|
115 |
+
```python
|
116 |
+
# lightrag/lightrag.py - __post_init__方法
|
117 |
+
self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)(
|
118 |
+
partial(
|
119 |
+
self.llm_model_func,
|
120 |
+
hashing_kv=hashing_kv,
|
121 |
+
**self.llm_model_kwargs,
|
122 |
+
)
|
123 |
+
)
|
124 |
+
# 🔥 全局LLM队列大小 = llm_model_max_async = 4
|
125 |
+
```
|
126 |
+
|
127 |
+
### 优先级队列实现
|
128 |
+
|
129 |
+
```python
|
130 |
+
# lightrag/utils.py - priority_limit_async_func_call函数
|
131 |
+
def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
132 |
+
def final_decro(func):
|
133 |
+
queue = asyncio.PriorityQueue(maxsize=max_queue_size)
|
134 |
+
tasks = set()
|
135 |
+
|
136 |
+
async def worker():
|
137 |
+
"""Worker that processes tasks in the priority queue"""
|
138 |
+
while not shutdown_event.is_set():
|
139 |
+
try:
|
140 |
+
priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0)
|
141 |
+
result = await func(*args, **kwargs) # 🔥 实际LLM调用
|
142 |
+
if not future.done():
|
143 |
+
future.set_result(result)
|
144 |
+
except Exception as e:
|
145 |
+
# 错误处理...
|
146 |
+
finally:
|
147 |
+
queue.task_done()
|
148 |
+
|
149 |
+
# 🔥 创建固定数量的worker(max_size个),这是真正的并发限制
|
150 |
+
for _ in range(max_size):
|
151 |
+
task = asyncio.create_task(worker())
|
152 |
+
tasks.add(task)
|
153 |
+
```
|
154 |
+
|
155 |
+
## 4. Chunk内部处理机制(串行)
|
156 |
+
|
157 |
+
### 为什么是串行?
|
158 |
+
|
159 |
+
每个chunk内部的处理严格按照以下顺序串行执行:
|
160 |
+
|
161 |
+
```python
|
162 |
+
# lightrag/operate.py - _process_single_content函数
|
163 |
+
async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
|
164 |
+
# 步骤1:初始实体提取
|
165 |
+
hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content})
|
166 |
+
final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...)
|
167 |
+
|
168 |
+
# 处理初始提取结果
|
169 |
+
maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path)
|
170 |
+
|
171 |
+
# 步骤2:Gleaning(深挖)阶段
|
172 |
+
for now_glean_index in range(entity_extract_max_gleaning):
|
173 |
+
# 🔥 串行等待gleaning结果
|
174 |
+
glean_result = await use_llm_func_with_cache(
|
175 |
+
continue_prompt, use_llm_func,
|
176 |
+
llm_response_cache=llm_response_cache,
|
177 |
+
history_messages=history, cache_type="extract"
|
178 |
+
)
|
179 |
+
|
180 |
+
# 处理gleaning结果
|
181 |
+
glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path)
|
182 |
+
|
183 |
+
# 合并结果...
|
184 |
+
|
185 |
+
# 步骤3:判断是否继续循环
|
186 |
+
if now_glean_index == entity_extract_max_gleaning - 1:
|
187 |
+
break
|
188 |
+
|
189 |
+
# 🔥 串行等待循环判断结果
|
190 |
+
if_loop_result = await use_llm_func_with_cache(
|
191 |
+
if_loop_prompt, use_llm_func,
|
192 |
+
llm_response_cache=llm_response_cache,
|
193 |
+
history_messages=history, cache_type="extract"
|
194 |
+
)
|
195 |
+
|
196 |
+
if if_loop_result.strip().strip('"').strip("'").lower() != "yes":
|
197 |
+
break
|
198 |
+
|
199 |
+
return maybe_nodes, maybe_edges
|
200 |
+
```
|
201 |
+
|
202 |
+
## 5. 完整的并发层次图
|
203 |
+

|
204 |
+
|
205 |
+
|
206 |
+
## 6. 实际运行场景分析
|
207 |
+
|
208 |
+
### 场景1:单文档多Chunk
|
209 |
+
假设有1个文档,包含6个chunks:
|
210 |
+
|
211 |
+
- 文档级别:只有1个文档,不受 `max_parallel_insert` 限制
|
212 |
+
- Chunk级别:最多4个chunks同时处理(受 `llm_model_max_async=4` 限制)
|
213 |
+
- LLM级别:全局最多4个LLM请求并发
|
214 |
+
|
215 |
+
**预期行为**:4个chunks并发处理,剩余2个chunks等待。
|
216 |
+
|
217 |
+
### 场景2:多文档多Chunk
|
218 |
+
假设有3个文档,每个文档包含10个chunks:
|
219 |
+
|
220 |
+
- 文档级别:最多2个文档同时处理
|
221 |
+
- Chunk级别:每个文档最多4个chunks同时处理
|
222 |
+
- 理论Chunk并发:2 × 4 = 8个chunks同时处理
|
223 |
+
- 实际LLM并发:只有4个LLM请求真正执行
|
224 |
+
|
225 |
+
**实际状态分布**:
|
226 |
+
```
|
227 |
+
# 可能的系统状态:
|
228 |
+
文档1: 4个chunks"处理中"(其中2个在执行LLM,2个在等待LLM响应)
|
229 |
+
文档2: 4个chunks"处理中"(其中2个在执行LLM,2个在等待LLM响应)
|
230 |
+
文档3: 等待文档级别信号量
|
231 |
+
|
232 |
+
总计:
|
233 |
+
- 8个chunks处于"处理中"状态
|
234 |
+
- 4个LLM请求真正执行
|
235 |
+
- 4个chunks等待LLM响应
|
236 |
+
```
|
237 |
+
|
238 |
+
## 7. 性能优化建议
|
239 |
+
|
240 |
+
### 理解瓶颈
|
241 |
+
|
242 |
+
**真正的瓶颈是全局LLM队列,而不是chunk信号量!**
|
243 |
+
|
244 |
+
### 调整策略
|
245 |
+
|
246 |
+
**策略1:提高LLM并发能力**
|
247 |
+
|
248 |
+
```bash
|
249 |
+
# 环境变量配置
|
250 |
+
export MAX_PARALLEL_INSERT=2 # 保持文档并发
|
251 |
+
export MAX_ASYNC=8 # 🔥 增加LLM请求并发数
|
252 |
+
```
|
253 |
+
|
254 |
+
**策略2:平衡文档和LLM并发**
|
255 |
+
|
256 |
+
```python
|
257 |
+
rag = LightRAG(
|
258 |
+
max_parallel_insert=3, # 适度增加文档并发
|
259 |
+
llm_model_max_async=12, # 大幅增加LLM并发
|
260 |
+
entity_extract_max_gleaning=0, # 减少chunk内串行步骤
|
261 |
+
)
|
262 |
+
```
|
263 |
+
|
264 |
+
## 8. 总结
|
265 |
+
|
266 |
+
LightRAG的多文档并发处理机制的关键特点:
|
267 |
+
|
268 |
+
### 并发层次
|
269 |
+
1. **文档间争抢**:受 `max_parallel_insert` 控制,默认2个文档并发
|
270 |
+
2. **理论Chunk并发**:每个文档独立创建信号量,总数 = `max_parallel_insert × llm_model_max_async`
|
271 |
+
3. **实际LLM并发**:所有chunk共享全局LLM队列,受 `llm_model_max_async` 控制
|
272 |
+
4. **单Chunk内串行**:每个chunk内的多个LLM请求严格串行执行
|
273 |
+
|
274 |
+
### 关键洞察
|
275 |
+
- **理论vs实际**:系统可能有很多chunk在"处理中",但只有少数在真正执行LLM请求
|
276 |
+
- **真正瓶颈**:全局LLM请求队列是性能瓶颈,而不是chunk信号量
|
277 |
+
- **优化重点**:提高 `llm_model_max_async` 比增加 `max_parallel_insert` 更有效
|