earayu commited on
Commit
4539bd8
·
1 Parent(s): 367afab

feat: add doc

Browse files
docs/LightRAG Multi-Document Processing: Concurrent Control Strategy Analysis.md ADDED
@@ -0,0 +1,286 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # LightRAG Multi-Document Processing: Concurrent Control Strategy Analysis
2
+
3
+ LightRAG employs a multi-layered concurrent control strategy when processing multiple documents. This article provides an in-depth analysis of the concurrent control mechanisms at document level, chunk level, and LLM request level, helping you understand why specific concurrent behaviors occur.
4
+
5
+ ## Overview
6
+
7
+ LightRAG's concurrent control is divided into three layers:
8
+
9
+ 1. **Document-level concurrency**: Controls the number of documents processed simultaneously
10
+ 2. **Chunk-level concurrency**: Controls the number of chunks processed simultaneously within a single document
11
+ 3. **LLM request-level concurrency**: Controls the global concurrent number of LLM requests
12
+
13
+ ## 1. Document-Level Concurrent Control
14
+
15
+ **Control Parameter**: `max_parallel_insert`
16
+
17
+ Document-level concurrency is controlled by the `max_parallel_insert` parameter, with a default value of 2.
18
+
19
+ ```python
20
+ # lightrag/lightrag.py
21
+ max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
22
+ ```
23
+
24
+ ### Implementation Mechanism
25
+
26
+ In the `apipeline_process_enqueue_documents` method, a semaphore is used to control document concurrency:
27
+
28
+ ```python
29
+ # lightrag/lightrag.py - apipeline_process_enqueue_documents method
30
+ async def process_document(
31
+ doc_id: str,
32
+ status_doc: DocProcessingStatus,
33
+ split_by_character: str | None,
34
+ split_by_character_only: bool,
35
+ pipeline_status: dict,
36
+ pipeline_status_lock: asyncio.Lock,
37
+ semaphore: asyncio.Semaphore, # Document-level semaphore
38
+ ) -> None:
39
+ """Process single document"""
40
+ async with semaphore: # 🔥 Document-level concurrent control
41
+ # ... Process all chunks of a single document
42
+
43
+ # Create document-level semaphore
44
+ semaphore = asyncio.Semaphore(self.max_parallel_insert) # Default 2
45
+
46
+ # Create processing tasks for each document
47
+ doc_tasks = []
48
+ for doc_id, status_doc in to_process_docs.items():
49
+ doc_tasks.append(
50
+ process_document(
51
+ doc_id, status_doc, split_by_character, split_by_character_only,
52
+ pipeline_status, pipeline_status_lock, semaphore
53
+ )
54
+ )
55
+
56
+ # Wait for all documents to complete processing
57
+ await asyncio.gather(*doc_tasks)
58
+ ```
59
+
60
+ ## 2. Chunk-Level Concurrent Control
61
+
62
+ **Control Parameter**: `llm_model_max_async`
63
+
64
+ **Key Point**: Each document independently creates its own chunk semaphore!
65
+
66
+ ```python
67
+ # lightrag/lightrag.py
68
+ llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
69
+ ```
70
+
71
+ ### Implementation Mechanism
72
+
73
+ In the `extract_entities` function, **each document independently creates** its own chunk semaphore:
74
+
75
+ ```python
76
+ # lightrag/operate.py - extract_entities function
77
+ async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...):
78
+ # 🔥 Key: Each document independently creates this semaphore!
79
+ llm_model_max_async = global_config.get("llm_model_max_async", 4)
80
+ semaphore = asyncio.Semaphore(llm_model_max_async) # Chunk semaphore for each document
81
+
82
+ async def _process_with_semaphore(chunk):
83
+ async with semaphore: # 🔥 Chunk concurrent control within document
84
+ return await _process_single_content(chunk)
85
+
86
+ # Create tasks for each chunk
87
+ tasks = []
88
+ for c in ordered_chunks:
89
+ task = asyncio.create_task(_process_with_semaphore(c))
90
+ tasks.append(task)
91
+
92
+ # Wait for all chunks to complete processing
93
+ done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
94
+ chunk_results = [task.result() for task in tasks]
95
+ return chunk_results
96
+ ```
97
+
98
+ ### Important Inference: System Overall Chunk Concurrency
99
+
100
+ Since each document independently creates chunk semaphores, the theoretical chunk concurrency of the system is:
101
+
102
+ **Theoretical Chunk Concurrency = max_parallel_insert × llm_model_max_async**
103
+
104
+ For example:
105
+ - `max_parallel_insert = 2` (process 2 documents simultaneously)
106
+ - `llm_model_max_async = 4` (maximum 4 chunk concurrency per document)
107
+ - **Theoretical result**: Maximum 2 × 4 = 8 chunks simultaneously in "processing" state
108
+
109
+ ## 3. LLM Request-Level Concurrent Control (The Real Bottleneck)
110
+
111
+ **Control Parameter**: `llm_model_max_async` (globally shared)
112
+
113
+ **Key**: Although there might be 8 chunks "in processing", all LLM requests share the same global priority queue!
114
+
115
+ ```python
116
+ # lightrag/lightrag.py - __post_init__ method
117
+ self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)(
118
+ partial(
119
+ self.llm_model_func,
120
+ hashing_kv=hashing_kv,
121
+ **self.llm_model_kwargs,
122
+ )
123
+ )
124
+ # 🔥 Global LLM queue size = llm_model_max_async = 4
125
+ ```
126
+
127
+ ### Priority Queue Implementation
128
+
129
+ ```python
130
+ # lightrag/utils.py - priority_limit_async_func_call function
131
+ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
132
+ def final_decro(func):
133
+ queue = asyncio.PriorityQueue(maxsize=max_queue_size)
134
+ tasks = set()
135
+
136
+ async def worker():
137
+ """Worker that processes tasks in the priority queue"""
138
+ while not shutdown_event.is_set():
139
+ try:
140
+ priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0)
141
+ result = await func(*args, **kwargs) # 🔥 Actual LLM call
142
+ if not future.done():
143
+ future.set_result(result)
144
+ except Exception as e:
145
+ # Error handling...
146
+ finally:
147
+ queue.task_done()
148
+
149
+ # 🔥 Create fixed number of workers (max_size), this is the real concurrency limit
150
+ for _ in range(max_size):
151
+ task = asyncio.create_task(worker())
152
+ tasks.add(task)
153
+ ```
154
+
155
+ ## 4. Chunk Internal Processing Mechanism (Serial)
156
+
157
+ ### Why Serial?
158
+
159
+ Internal processing of each chunk strictly follows this serial execution order:
160
+
161
+ ```python
162
+ # lightrag/operate.py - _process_single_content function
163
+ async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
164
+ # Step 1: Initial entity extraction
165
+ hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content})
166
+ final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...)
167
+
168
+ # Process initial extraction results
169
+ maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path)
170
+
171
+ # Step 2: Gleaning phase
172
+ for now_glean_index in range(entity_extract_max_gleaning):
173
+ # 🔥 Serial wait for gleaning results
174
+ glean_result = await use_llm_func_with_cache(
175
+ continue_prompt, use_llm_func,
176
+ llm_response_cache=llm_response_cache,
177
+ history_messages=history, cache_type="extract"
178
+ )
179
+
180
+ # Process gleaning results
181
+ glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path)
182
+
183
+ # Merge results...
184
+
185
+ # Step 3: Determine whether to continue loop
186
+ if now_glean_index == entity_extract_max_gleaning - 1:
187
+ break
188
+
189
+ # 🔥 Serial wait for loop decision results
190
+ if_loop_result = await use_llm_func_with_cache(
191
+ if_loop_prompt, use_llm_func,
192
+ llm_response_cache=llm_response_cache,
193
+ history_messages=history, cache_type="extract"
194
+ )
195
+
196
+ if if_loop_result.strip().strip('"').strip("'").lower() != "yes":
197
+ break
198
+
199
+ return maybe_nodes, maybe_edges
200
+ ```
201
+
202
+ ## 5. Complete Concurrent Hierarchy Diagram
203
+ ![lightrag_indexing.png](assets%2Flightrag_indexing.png)
204
+
205
+ ### Chunk Internal Processing (Serial)
206
+ ```
207
+ Initial Extraction → Gleaning → Loop Decision → Complete
208
+ ```
209
+
210
+ ## 6. Real-World Scenario Analysis
211
+
212
+ ### Scenario 1: Single Document with Multiple Chunks
213
+ Assume 1 document with 6 chunks:
214
+
215
+ - **Document level**: Only 1 document, not limited by `max_parallel_insert`
216
+ - **Chunk level**: Maximum 4 chunks processed simultaneously (limited by `llm_model_max_async=4`)
217
+ - **LLM level**: Global maximum 4 LLM requests concurrent
218
+
219
+ **Expected behavior**: 4 chunks process concurrently, remaining 2 chunks wait.
220
+
221
+ ### Scenario 2: Multiple Documents with Multiple Chunks
222
+ Assume 3 documents, each with 10 chunks:
223
+
224
+ - **Document level**: Maximum 2 documents processed simultaneously
225
+ - **Chunk level**: Maximum 4 chunks per document processed simultaneously
226
+ - **Theoretical Chunk concurrency**: 2 × 4 = 8 chunks processed simultaneously
227
+ - **Actual LLM concurrency**: Only 4 LLM requests actually execute
228
+
229
+ **Actual state distribution**:
230
+ ```
231
+ # Possible system state:
232
+ Document 1: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response)
233
+ Document 2: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response)
234
+ Document 3: Waiting for document-level semaphore
235
+
236
+ Total:
237
+ - 8 chunks in "processing" state
238
+ - 4 LLM requests actually executing
239
+ - 4 chunks waiting for LLM response
240
+ ```
241
+
242
+ ### Scenario 3: Resource Bottleneck Analysis
243
+
244
+ **Timeline Example (max_parallel_insert=2, llm_model_max_async=4)**:
245
+ ![lightrag_llm_request_timeline.png](assets%2Flightrag_llm_request_timeline.png)
246
+
247
+ ## 7. Performance Optimization Recommendations
248
+
249
+ ### Understanding the Bottleneck
250
+
251
+ The real bottleneck is the global LLM queue, not the chunk semaphores!
252
+
253
+ ### Adjustment Strategies
254
+
255
+ **Strategy 1: Increase LLM Concurrent Capacity**
256
+
257
+ ```bash
258
+ # Environment variable configuration
259
+ export MAX_PARALLEL_INSERT=2 # Keep document concurrency
260
+ export MAX_ASYNC=8 # 🔥 Increase LLM request concurrency
261
+ ```
262
+
263
+ **Strategy 2: Balance Document and LLM Concurrency**
264
+
265
+ ```python
266
+ rag = LightRAG(
267
+ max_parallel_insert=3, # Moderately increase document concurrency
268
+ llm_model_max_async=12, # Significantly increase LLM concurrency
269
+ entity_extract_max_gleaning=0, # Reduce serial steps within chunks
270
+ )
271
+ ```
272
+
273
+ ## 8. Summary
274
+
275
+ Key characteristics of LightRAG's multi-document concurrent processing mechanism:
276
+
277
+ ### Concurrent Layers
278
+ 1. **Inter-document competition**: Controlled by `max_parallel_insert`, default 2 documents concurrent
279
+ 2. **Theoretical Chunk concurrency**: Each document independently creates semaphores, total = max_parallel_insert × llm_model_max_async
280
+ 3. **Actual LLM concurrency**: All chunks share global LLM queue, controlled by `llm_model_max_async`
281
+ 4. **Intra-chunk serial**: Multiple LLM requests within each chunk execute strictly serially
282
+
283
+ ### Key Insights
284
+ - **Theoretical vs Actual**: System may have many chunks "in processing", but only few are actually executing LLM requests
285
+ - **Real Bottleneck**: Global LLM request queue is the performance bottleneck, not chunk semaphores
286
+ - **Optimization Focus**: Increasing `llm_model_max_async` is more effective than increasing `max_parallel_insert`
docs/assets/lightrag_llm_request_timeline.png ADDED

Git LFS Details

  • SHA256: 0c047fa438f2641e8e6c70b5fda1e43afeb25c02303d9ba472923d5191ed7c78
  • Pointer size: 131 Bytes
  • Size of remote file: 121 kB
docs/zh/LightRAG 多文档并发处理机制详解.md ADDED
@@ -0,0 +1,280 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # LightRAG 多文档并发控制机制详解
2
+
3
+ LightRAG 在处理多个文档时采用了多层次的并发控制策略。本文将深入分析文档级别、chunk级别和LLM请求级别的并发控制机制,帮助您理解为什么会出现特定的并发行为。
4
+
5
+ ## 概述
6
+
7
+ LightRAG 的并发控制分为三个层次:
8
+
9
+ 1. 文档级别并发:控制同时处理的文档数量
10
+ 2. Chunk级别并发:控制单个文档内同时处理的chunk数量
11
+ 3. LLM请求级别并发:控制全局LLM请求的并发数量
12
+
13
+ ## 1. 文档级别并发控制
14
+
15
+ **控制参数**:`max_parallel_insert`
16
+
17
+ 文档级别的并发由 `max_parallel_insert` 参数控制,默认值为2。
18
+
19
+ ```python
20
+ # lightrag/lightrag.py
21
+ max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
22
+ ```
23
+
24
+ ### 实现机制
25
+
26
+ 在 `apipeline_process_enqueue_documents` 方法中,使用信号量控制文档并发:
27
+
28
+ ```python
29
+ # lightrag/lightrag.py - apipeline_process_enqueue_documents方法
30
+ async def process_document(
31
+ doc_id: str,
32
+ status_doc: DocProcessingStatus,
33
+ split_by_character: str | None,
34
+ split_by_character_only: bool,
35
+ pipeline_status: dict,
36
+ pipeline_status_lock: asyncio.Lock,
37
+ semaphore: asyncio.Semaphore, # 文档级别信号量
38
+ ) -> None:
39
+ """Process single document"""
40
+ async with semaphore: # 🔥 文档级别并发控制
41
+ # ... 处理单个文档的所有chunks
42
+
43
+ # 创建文档级别信号量
44
+ semaphore = asyncio.Semaphore(self.max_parallel_insert) # 默认2
45
+
46
+ # 为每个文档创建处理任务
47
+ doc_tasks = []
48
+ for doc_id, status_doc in to_process_docs.items():
49
+ doc_tasks.append(
50
+ process_document(
51
+ doc_id, status_doc, split_by_character, split_by_character_only,
52
+ pipeline_status, pipeline_status_lock, semaphore
53
+ )
54
+ )
55
+
56
+ # 等待所有文档处理完成
57
+ await asyncio.gather(*doc_tasks)
58
+ ```
59
+
60
+ ## 2. Chunk级别并发控制
61
+
62
+ **控制参数**:`llm_model_max_async`
63
+
64
+ **关键点**:每个文档都会独立创建自己的chunk信号量!
65
+
66
+ ```python
67
+ # lightrag/lightrag.py
68
+ llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
69
+ ```
70
+
71
+ ### 实现机制
72
+
73
+ 在 `extract_entities` 函数中,**每个文档独立创建**自己的chunk信号量:
74
+
75
+ ```python
76
+ # lightrag/operate.py - extract_entities函数
77
+ async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...):
78
+ # 🔥 关键:每个文档都会独立创建这个信号量!
79
+ llm_model_max_async = global_config.get("llm_model_max_async", 4)
80
+ semaphore = asyncio.Semaphore(llm_model_max_async) # 每个文档的chunk信号量
81
+
82
+ async def _process_with_semaphore(chunk):
83
+ async with semaphore: # 🔥 文档内部的chunk并发控制
84
+ return await _process_single_content(chunk)
85
+
86
+ # 为每个chunk创建任务
87
+ tasks = []
88
+ for c in ordered_chunks:
89
+ task = asyncio.create_task(_process_with_semaphore(c))
90
+ tasks.append(task)
91
+
92
+ # 等待所有chunk处理完成
93
+ done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
94
+ chunk_results = [task.result() for task in tasks]
95
+ return chunk_results
96
+ ```
97
+
98
+ ### 重要推论:系统整体Chunk并发数
99
+
100
+ 由于每个文档独立创建chunk信号量,系统理论上的chunk并发数是:
101
+
102
+ **理论Chunk并发数 = max_parallel_insert × llm_model_max_async**
103
+
104
+ 例如:
105
+ - `max_parallel_insert = 2`(同时处理2个文档)
106
+ - `llm_model_max_async = 4`(每个文档最多4个chunk并发)
107
+ - 理论结果:最多 2 × 4 = 8个chunk同时处于"处理中"状态
108
+
109
+ ## 3. LLM请求级别并发控制(真正的瓶颈)
110
+
111
+ **控制参数**:`llm_model_max_async`(全局共享)
112
+
113
+ **关键**:尽管可能有8个chunk在"处理中",但所有LLM请求共享同一个全局优先级队列!
114
+
115
+ ```python
116
+ # lightrag/lightrag.py - __post_init__方法
117
+ self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)(
118
+ partial(
119
+ self.llm_model_func,
120
+ hashing_kv=hashing_kv,
121
+ **self.llm_model_kwargs,
122
+ )
123
+ )
124
+ # 🔥 全局LLM队列大小 = llm_model_max_async = 4
125
+ ```
126
+
127
+ ### 优先级队列实现
128
+
129
+ ```python
130
+ # lightrag/utils.py - priority_limit_async_func_call函数
131
+ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
132
+ def final_decro(func):
133
+ queue = asyncio.PriorityQueue(maxsize=max_queue_size)
134
+ tasks = set()
135
+
136
+ async def worker():
137
+ """Worker that processes tasks in the priority queue"""
138
+ while not shutdown_event.is_set():
139
+ try:
140
+ priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0)
141
+ result = await func(*args, **kwargs) # 🔥 实际LLM调用
142
+ if not future.done():
143
+ future.set_result(result)
144
+ except Exception as e:
145
+ # 错误处理...
146
+ finally:
147
+ queue.task_done()
148
+
149
+ # 🔥 创建固定数量的worker(max_size个),这是真正的并发限制
150
+ for _ in range(max_size):
151
+ task = asyncio.create_task(worker())
152
+ tasks.add(task)
153
+ ```
154
+
155
+ ## 4. Chunk内部处理机制(串行)
156
+
157
+ ### 为什么是串行?
158
+
159
+ 每个chunk内部的处理严格按照以下顺序串行执行:
160
+
161
+ ```python
162
+ # lightrag/operate.py - _process_single_content函数
163
+ async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
164
+ # 步骤1:初始实体提取
165
+ hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content})
166
+ final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...)
167
+
168
+ # 处理初始提取结果
169
+ maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path)
170
+
171
+ # 步骤2:Gleaning(深挖)阶段
172
+ for now_glean_index in range(entity_extract_max_gleaning):
173
+ # 🔥 串行等待gleaning结果
174
+ glean_result = await use_llm_func_with_cache(
175
+ continue_prompt, use_llm_func,
176
+ llm_response_cache=llm_response_cache,
177
+ history_messages=history, cache_type="extract"
178
+ )
179
+
180
+ # 处理gleaning结果
181
+ glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path)
182
+
183
+ # 合并结果...
184
+
185
+ # 步骤3:判断是否继续循环
186
+ if now_glean_index == entity_extract_max_gleaning - 1:
187
+ break
188
+
189
+ # 🔥 串行等待循环判断结果
190
+ if_loop_result = await use_llm_func_with_cache(
191
+ if_loop_prompt, use_llm_func,
192
+ llm_response_cache=llm_response_cache,
193
+ history_messages=history, cache_type="extract"
194
+ )
195
+
196
+ if if_loop_result.strip().strip('"').strip("'").lower() != "yes":
197
+ break
198
+
199
+ return maybe_nodes, maybe_edges
200
+ ```
201
+
202
+ ## 5. 完整的并发层次图
203
+ ![lightrag_indexing.png](..%2Fassets%2Flightrag_indexing.png)
204
+
205
+
206
+ ## 6. 实际运行场景分析
207
+
208
+ ### 场景1:单文档多Chunk
209
+ 假设有1个文档,包含6个chunks:
210
+
211
+ - 文档级别:只有1个文档,不受 `max_parallel_insert` 限制
212
+ - Chunk级别:最多4个chunks同时处理(受 `llm_model_max_async=4` 限制)
213
+ - LLM级别:全局最多4个LLM请求并发
214
+
215
+ **预期行为**:4个chunks并发处理,剩余2个chunks等待。
216
+
217
+ ### 场景2:多文档多Chunk
218
+ 假设有3个文档,每个文档包含10个chunks:
219
+
220
+ - 文档级别:最多2个文档同时处理
221
+ - Chunk级别:每个文档最多4个chunks同时处理
222
+ - 理论Chunk并发:2 × 4 = 8个chunks同时处理
223
+ - 实际LLM并发:只有4个LLM请求真正执行
224
+
225
+ **实际状态分布**:
226
+ ```
227
+ # 可能的系统状态:
228
+ 文档1: 4个chunks"处理中"(其中2个在执行LLM,2个在等待LLM响应)
229
+ 文档2: 4个chunks"处理中"(其中2个在执行LLM,2个在等待LLM响应)
230
+ 文档3: 等待文档级别信号量
231
+
232
+ 总计:
233
+ - 8个chunks处于"处理中"状态
234
+ - 4个LLM请求真正执行
235
+ - 4个chunks等待LLM响应
236
+ ```
237
+
238
+ ### 场景3:资源瓶颈分析
239
+ [LightRAG 多文档并发处理机制详解.md](LightRAG%20%E5%A4%9A%E6%96%87%E6%A1%A3%E5%B9%B6%E5%8F%91%E5%A4%84%E7%90%86%E6%9C%BA%E5%88%B6%E8%AF%A6%E8%A7%A3.md)
240
+
241
+ ## 7. 性能优化建议
242
+
243
+ ### 理解瓶颈
244
+
245
+ **真正的瓶颈是全局LLM队列,而不是chunk信号量!**
246
+
247
+ ### 调整策略
248
+
249
+ **策略1:提高LLM并发能力**
250
+
251
+ ```bash
252
+ # 环境变量配置
253
+ export MAX_PARALLEL_INSERT=2 # 保持文档并发
254
+ export MAX_ASYNC=8 # 🔥 增加LLM请求并发数
255
+ ```
256
+
257
+ **策略2:平衡文档和LLM并发**
258
+
259
+ ```python
260
+ rag = LightRAG(
261
+ max_parallel_insert=3, # 适度增加文档并发
262
+ llm_model_max_async=12, # 大幅增加LLM并发
263
+ entity_extract_max_gleaning=0, # 减少chunk内串行步骤
264
+ )
265
+ ```
266
+
267
+ ## 8. 总结
268
+
269
+ LightRAG的多文档并发处理机制的关键特点:
270
+
271
+ ### 并发层次
272
+ 1. **文档间争抢**:受 `max_parallel_insert` 控制,默认2个文档并发
273
+ 2. **理论Chunk并发**:每个文档独立创建信号量,总数 = `max_parallel_insert × llm_model_max_async`
274
+ 3. **实际LLM并发**:所有chunk共享全局LLM队列,受 `llm_model_max_async` 控制
275
+ 4. **单Chunk内串行**:每个chunk内的多个LLM请求严格串行执行
276
+
277
+ ### 关键洞察
278
+ - **理论vs实际**:系统可能有很多chunk在"处理中",但只有少数在真正执行LLM请求
279
+ - **真正瓶颈**:全局LLM请求队列是性能瓶颈,而不是chunk信号量
280
+ - **优化重点**:提高 `llm_model_max_async` 比增加 `max_parallel_insert` 更有效