gzdaniel commited on
Commit
64389f5
·
2 Parent(s): feb7286 b05bfe9

Merge branch 'main' into litellm-problem

Browse files
README.md CHANGED
@@ -149,6 +149,12 @@ For a streaming response implementation example, please see `examples/lightrag_o
149
 
150
  > If you would like to integrate LightRAG into your project, we recommend utilizing the REST API provided by the LightRAG Server. LightRAG Core is typically intended for embedded applications or for researchers who wish to conduct studies and evaluations.
151
 
 
 
 
 
 
 
152
  ### A Simple Program
153
 
154
  Use the below Python snippet to initialize LightRAG, insert text to it, and perform queries:
@@ -173,8 +179,9 @@ async def initialize_rag():
173
  embedding_func=openai_embed,
174
  llm_model_func=gpt_4o_mini_complete,
175
  )
176
- await rag.initialize_storages()
177
- await initialize_pipeline_status()
 
178
  return rag
179
 
180
  async def main():
@@ -1061,7 +1068,7 @@ LightRAG now supports comprehensive multi-modal document processing through [Min
1061
  - **Multi-Element Extraction**: Extract and index text, images, tables, formulas, and document structure
1062
  - **Multimodal Retrieval**: Query and retrieve diverse content types (text, images, tables, formulas) within RAG workflows
1063
  - **Seamless Integration**: Works smoothly with LightRAG core and RAG-Anything frameworks
1064
-
1065
  **Quick Start:**
1066
  1. Install dependencies:
1067
  ```bash
@@ -1501,6 +1508,33 @@ Thank you to all our contributors!
1501
  <img src="https://contrib.rocks/image?repo=HKUDS/LightRAG" />
1502
  </a>
1503
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1504
  ## 🌟Citation
1505
 
1506
  ```python
 
149
 
150
  > If you would like to integrate LightRAG into your project, we recommend utilizing the REST API provided by the LightRAG Server. LightRAG Core is typically intended for embedded applications or for researchers who wish to conduct studies and evaluations.
151
 
152
+ ### ⚠️ Important: Initialization Requirements
153
+
154
+ **LightRAG requires explicit initialization before use.** You must call both `await rag.initialize_storages()` and `await initialize_pipeline_status()` after creating a LightRAG instance, otherwise you will encounter errors like:
155
+ - `AttributeError: __aenter__` - if storages are not initialized
156
+ - `KeyError: 'history_messages'` - if pipeline status is not initialized
157
+
158
  ### A Simple Program
159
 
160
  Use the below Python snippet to initialize LightRAG, insert text to it, and perform queries:
 
179
  embedding_func=openai_embed,
180
  llm_model_func=gpt_4o_mini_complete,
181
  )
182
+ # IMPORTANT: Both initialization calls are required!
183
+ await rag.initialize_storages() # Initialize storage backends
184
+ await initialize_pipeline_status() # Initialize processing pipeline
185
  return rag
186
 
187
  async def main():
 
1068
  - **Multi-Element Extraction**: Extract and index text, images, tables, formulas, and document structure
1069
  - **Multimodal Retrieval**: Query and retrieve diverse content types (text, images, tables, formulas) within RAG workflows
1070
  - **Seamless Integration**: Works smoothly with LightRAG core and RAG-Anything frameworks
1071
+
1072
  **Quick Start:**
1073
  1. Install dependencies:
1074
  ```bash
 
1508
  <img src="https://contrib.rocks/image?repo=HKUDS/LightRAG" />
1509
  </a>
1510
 
1511
+ ## Troubleshooting
1512
+
1513
+ ### Common Initialization Errors
1514
+
1515
+ If you encounter these errors when using LightRAG:
1516
+
1517
+ 1. **`AttributeError: __aenter__`**
1518
+ - **Cause**: Storage backends not initialized
1519
+ - **Solution**: Call `await rag.initialize_storages()` after creating the LightRAG instance
1520
+
1521
+ 2. **`KeyError: 'history_messages'`**
1522
+ - **Cause**: Pipeline status not initialized
1523
+ - **Solution**: Call `await initialize_pipeline_status()` after initializing storages
1524
+
1525
+ 3. **Both errors in sequence**
1526
+ - **Cause**: Neither initialization method was called
1527
+ - **Solution**: Always follow this pattern:
1528
+ ```python
1529
+ rag = LightRAG(...)
1530
+ await rag.initialize_storages()
1531
+ await initialize_pipeline_status()
1532
+ ```
1533
+
1534
+ ### Model Switching Issues
1535
+
1536
+ When switching between different embedding models, you must clear the data directory to avoid errors. The only file you may want to preserve is `kv_store_llm_response_cache.json` if you wish to retain the LLM cache.
1537
+
1538
  ## 🌟Citation
1539
 
1540
  ```python
docs/LightRAG_concurrent_explain.md ADDED
@@ -0,0 +1,281 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # LightRAG Multi-Document Processing: Concurrent Control Strategy Analysis
2
+
3
+ LightRAG employs a multi-layered concurrent control strategy when processing multiple documents. This article provides an in-depth analysis of the concurrent control mechanisms at document level, chunk level, and LLM request level, helping you understand why specific concurrent behaviors occur.
4
+
5
+ ## Overview
6
+
7
+ LightRAG's concurrent control is divided into three layers:
8
+
9
+ 1. **Document-level concurrency**: Controls the number of documents processed simultaneously
10
+ 2. **Chunk-level concurrency**: Controls the number of chunks processed simultaneously within a single document
11
+ 3. **LLM request-level concurrency**: Controls the global concurrent number of LLM requests
12
+
13
+ ## 1. Document-Level Concurrent Control
14
+
15
+ **Control Parameter**: `max_parallel_insert`
16
+
17
+ Document-level concurrency is controlled by the `max_parallel_insert` parameter, with a default value of 2.
18
+
19
+ ```python
20
+ # lightrag/lightrag.py
21
+ max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
22
+ ```
23
+
24
+ ### Implementation Mechanism
25
+
26
+ In the `apipeline_process_enqueue_documents` method, a semaphore is used to control document concurrency:
27
+
28
+ ```python
29
+ # lightrag/lightrag.py - apipeline_process_enqueue_documents method
30
+ async def process_document(
31
+ doc_id: str,
32
+ status_doc: DocProcessingStatus,
33
+ split_by_character: str | None,
34
+ split_by_character_only: bool,
35
+ pipeline_status: dict,
36
+ pipeline_status_lock: asyncio.Lock,
37
+ semaphore: asyncio.Semaphore, # Document-level semaphore
38
+ ) -> None:
39
+ """Process single document"""
40
+ async with semaphore: # 🔥 Document-level concurrent control
41
+ # ... Process all chunks of a single document
42
+
43
+ # Create document-level semaphore
44
+ semaphore = asyncio.Semaphore(self.max_parallel_insert) # Default 2
45
+
46
+ # Create processing tasks for each document
47
+ doc_tasks = []
48
+ for doc_id, status_doc in to_process_docs.items():
49
+ doc_tasks.append(
50
+ process_document(
51
+ doc_id, status_doc, split_by_character, split_by_character_only,
52
+ pipeline_status, pipeline_status_lock, semaphore
53
+ )
54
+ )
55
+
56
+ # Wait for all documents to complete processing
57
+ await asyncio.gather(*doc_tasks)
58
+ ```
59
+
60
+ ## 2. Chunk-Level Concurrent Control
61
+
62
+ **Control Parameter**: `llm_model_max_async`
63
+
64
+ **Key Point**: Each document independently creates its own chunk semaphore!
65
+
66
+ ```python
67
+ # lightrag/lightrag.py
68
+ llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
69
+ ```
70
+
71
+ ### Implementation Mechanism
72
+
73
+ In the `extract_entities` function, **each document independently creates** its own chunk semaphore:
74
+
75
+ ```python
76
+ # lightrag/operate.py - extract_entities function
77
+ async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...):
78
+ # 🔥 Key: Each document independently creates this semaphore!
79
+ llm_model_max_async = global_config.get("llm_model_max_async", 4)
80
+ semaphore = asyncio.Semaphore(llm_model_max_async) # Chunk semaphore for each document
81
+
82
+ async def _process_with_semaphore(chunk):
83
+ async with semaphore: # 🔥 Chunk concurrent control within document
84
+ return await _process_single_content(chunk)
85
+
86
+ # Create tasks for each chunk
87
+ tasks = []
88
+ for c in ordered_chunks:
89
+ task = asyncio.create_task(_process_with_semaphore(c))
90
+ tasks.append(task)
91
+
92
+ # Wait for all chunks to complete processing
93
+ done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
94
+ chunk_results = [task.result() for task in tasks]
95
+ return chunk_results
96
+ ```
97
+
98
+ ### Important Inference: System Overall Chunk Concurrency
99
+
100
+ Since each document independently creates chunk semaphores, the theoretical chunk concurrency of the system is:
101
+
102
+ **Theoretical Chunk Concurrency = max_parallel_insert × llm_model_max_async**
103
+
104
+ For example:
105
+ - `max_parallel_insert = 2` (process 2 documents simultaneously)
106
+ - `llm_model_max_async = 4` (maximum 4 chunk concurrency per document)
107
+ - **Theoretical result**: Maximum 2 × 4 = 8 chunks simultaneously in "processing" state
108
+
109
+ ## 3. LLM Request-Level Concurrent Control (The Real Bottleneck)
110
+
111
+ **Control Parameter**: `llm_model_max_async` (globally shared)
112
+
113
+ **Key**: Although there might be 8 chunks "in processing", all LLM requests share the same global priority queue!
114
+
115
+ ```python
116
+ # lightrag/lightrag.py - __post_init__ method
117
+ self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)(
118
+ partial(
119
+ self.llm_model_func,
120
+ hashing_kv=hashing_kv,
121
+ **self.llm_model_kwargs,
122
+ )
123
+ )
124
+ # 🔥 Global LLM queue size = llm_model_max_async = 4
125
+ ```
126
+
127
+ ### Priority Queue Implementation
128
+
129
+ ```python
130
+ # lightrag/utils.py - priority_limit_async_func_call function
131
+ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
132
+ def final_decro(func):
133
+ queue = asyncio.PriorityQueue(maxsize=max_queue_size)
134
+ tasks = set()
135
+
136
+ async def worker():
137
+ """Worker that processes tasks in the priority queue"""
138
+ while not shutdown_event.is_set():
139
+ try:
140
+ priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0)
141
+ result = await func(*args, **kwargs) # 🔥 Actual LLM call
142
+ if not future.done():
143
+ future.set_result(result)
144
+ except Exception as e:
145
+ # Error handling...
146
+ finally:
147
+ queue.task_done()
148
+
149
+ # 🔥 Create fixed number of workers (max_size), this is the real concurrency limit
150
+ for _ in range(max_size):
151
+ task = asyncio.create_task(worker())
152
+ tasks.add(task)
153
+ ```
154
+
155
+ ## 4. Chunk Internal Processing Mechanism (Serial)
156
+
157
+ ### Why Serial?
158
+
159
+ Internal processing of each chunk strictly follows this serial execution order:
160
+
161
+ ```python
162
+ # lightrag/operate.py - _process_single_content function
163
+ async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
164
+ # Step 1: Initial entity extraction
165
+ hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content})
166
+ final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...)
167
+
168
+ # Process initial extraction results
169
+ maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path)
170
+
171
+ # Step 2: Gleaning phase
172
+ for now_glean_index in range(entity_extract_max_gleaning):
173
+ # 🔥 Serial wait for gleaning results
174
+ glean_result = await use_llm_func_with_cache(
175
+ continue_prompt, use_llm_func,
176
+ llm_response_cache=llm_response_cache,
177
+ history_messages=history, cache_type="extract"
178
+ )
179
+
180
+ # Process gleaning results
181
+ glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path)
182
+
183
+ # Merge results...
184
+
185
+ # Step 3: Determine whether to continue loop
186
+ if now_glean_index == entity_extract_max_gleaning - 1:
187
+ break
188
+
189
+ # 🔥 Serial wait for loop decision results
190
+ if_loop_result = await use_llm_func_with_cache(
191
+ if_loop_prompt, use_llm_func,
192
+ llm_response_cache=llm_response_cache,
193
+ history_messages=history, cache_type="extract"
194
+ )
195
+
196
+ if if_loop_result.strip().strip('"').strip("'").lower() != "yes":
197
+ break
198
+
199
+ return maybe_nodes, maybe_edges
200
+ ```
201
+
202
+ ## 5. Complete Concurrent Hierarchy Diagram
203
+ ![lightrag_indexing.png](assets%2Flightrag_indexing.png)
204
+
205
+ ### Chunk Internal Processing (Serial)
206
+ ```
207
+ Initial Extraction → Gleaning → Loop Decision → Complete
208
+ ```
209
+
210
+ ## 6. Real-World Scenario Analysis
211
+
212
+ ### Scenario 1: Single Document with Multiple Chunks
213
+ Assume 1 document with 6 chunks:
214
+
215
+ - **Document level**: Only 1 document, not limited by `max_parallel_insert`
216
+ - **Chunk level**: Maximum 4 chunks processed simultaneously (limited by `llm_model_max_async=4`)
217
+ - **LLM level**: Global maximum 4 LLM requests concurrent
218
+
219
+ **Expected behavior**: 4 chunks process concurrently, remaining 2 chunks wait.
220
+
221
+ ### Scenario 2: Multiple Documents with Multiple Chunks
222
+ Assume 3 documents, each with 10 chunks:
223
+
224
+ - **Document level**: Maximum 2 documents processed simultaneously
225
+ - **Chunk level**: Maximum 4 chunks per document processed simultaneously
226
+ - **Theoretical Chunk concurrency**: 2 × 4 = 8 chunks processed simultaneously
227
+ - **Actual LLM concurrency**: Only 4 LLM requests actually execute
228
+
229
+ **Actual state distribution**:
230
+ ```
231
+ # Possible system state:
232
+ Document 1: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response)
233
+ Document 2: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response)
234
+ Document 3: Waiting for document-level semaphore
235
+
236
+ Total:
237
+ - 8 chunks in "processing" state
238
+ - 4 LLM requests actually executing
239
+ - 4 chunks waiting for LLM response
240
+ ```
241
+
242
+ ## 7. Performance Optimization Recommendations
243
+
244
+ ### Understanding the Bottleneck
245
+
246
+ The real bottleneck is the global LLM queue, not the chunk semaphores!
247
+
248
+ ### Adjustment Strategies
249
+
250
+ **Strategy 1: Increase LLM Concurrent Capacity**
251
+
252
+ ```bash
253
+ # Environment variable configuration
254
+ export MAX_PARALLEL_INSERT=2 # Keep document concurrency
255
+ export MAX_ASYNC=8 # 🔥 Increase LLM request concurrency
256
+ ```
257
+
258
+ **Strategy 2: Balance Document and LLM Concurrency**
259
+
260
+ ```python
261
+ rag = LightRAG(
262
+ max_parallel_insert=3, # Moderately increase document concurrency
263
+ llm_model_max_async=12, # Significantly increase LLM concurrency
264
+ entity_extract_max_gleaning=0, # Reduce serial steps within chunks
265
+ )
266
+ ```
267
+
268
+ ## 8. Summary
269
+
270
+ Key characteristics of LightRAG's multi-document concurrent processing mechanism:
271
+
272
+ ### Concurrent Layers
273
+ 1. **Inter-document competition**: Controlled by `max_parallel_insert`, default 2 documents concurrent
274
+ 2. **Theoretical Chunk concurrency**: Each document independently creates semaphores, total = max_parallel_insert × llm_model_max_async
275
+ 3. **Actual LLM concurrency**: All chunks share global LLM queue, controlled by `llm_model_max_async`
276
+ 4. **Intra-chunk serial**: Multiple LLM requests within each chunk execute strictly serially
277
+
278
+ ### Key Insights
279
+ - **Theoretical vs Actual**: System may have many chunks "in processing", but only few are actually executing LLM requests
280
+ - **Real Bottleneck**: Global LLM request queue is the performance bottleneck, not chunk semaphores
281
+ - **Optimization Focus**: Increasing `llm_model_max_async` is more effective than increasing `max_parallel_insert`
docs/assets/lightrag_indexing.png ADDED

Git LFS Details

  • SHA256: d6aa96d364e7172712a83b03ae9b3c73eb55c4d34c0f269f42503d8c30718b23
  • Pointer size: 131 Bytes
  • Size of remote file: 187 kB
docs/zh/LightRAG_concurrent_explain_zh.md ADDED
@@ -0,0 +1,277 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # LightRAG 多文档并发控制机制详解
2
+
3
+ LightRAG 在处理多个文档时采用了多层次的并发控制策略。本文将深入分析文档级别、chunk级别和LLM请求级别的并发控制机制,帮助您理解为什么会出现特定的并发行为。
4
+
5
+ ## 概述
6
+
7
+ LightRAG 的并发控制分为三个层次:
8
+
9
+ 1. 文档级别并发:控制同时处理的文档数量
10
+ 2. Chunk级别并发:控制单个文档内同时处理的chunk数量
11
+ 3. LLM请求级别并发:控制全局LLM请求的并发数量
12
+
13
+ ## 1. 文档级别并发控制
14
+
15
+ **控制参数**:`max_parallel_insert`
16
+
17
+ 文档级别的并发由 `max_parallel_insert` 参数控制,默认值为2。
18
+
19
+ ```python
20
+ # lightrag/lightrag.py
21
+ max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
22
+ ```
23
+
24
+ ### 实现机制
25
+
26
+ 在 `apipeline_process_enqueue_documents` 方法中,使用信号量控制文档并发:
27
+
28
+ ```python
29
+ # lightrag/lightrag.py - apipeline_process_enqueue_documents方法
30
+ async def process_document(
31
+ doc_id: str,
32
+ status_doc: DocProcessingStatus,
33
+ split_by_character: str | None,
34
+ split_by_character_only: bool,
35
+ pipeline_status: dict,
36
+ pipeline_status_lock: asyncio.Lock,
37
+ semaphore: asyncio.Semaphore, # 文档级别信号量
38
+ ) -> None:
39
+ """Process single document"""
40
+ async with semaphore: # 🔥 文档级别并发控制
41
+ # ... 处理单个文档的所有chunks
42
+
43
+ # 创建文档级别信号量
44
+ semaphore = asyncio.Semaphore(self.max_parallel_insert) # 默认2
45
+
46
+ # 为每个文档创建处理任务
47
+ doc_tasks = []
48
+ for doc_id, status_doc in to_process_docs.items():
49
+ doc_tasks.append(
50
+ process_document(
51
+ doc_id, status_doc, split_by_character, split_by_character_only,
52
+ pipeline_status, pipeline_status_lock, semaphore
53
+ )
54
+ )
55
+
56
+ # 等待所有文档处理完成
57
+ await asyncio.gather(*doc_tasks)
58
+ ```
59
+
60
+ ## 2. Chunk级别并发控制
61
+
62
+ **控制参数**:`llm_model_max_async`
63
+
64
+ **关键点**:每个文档都会独立创建自己的chunk信号量!
65
+
66
+ ```python
67
+ # lightrag/lightrag.py
68
+ llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
69
+ ```
70
+
71
+ ### 实现机制
72
+
73
+ 在 `extract_entities` 函数中,**每个文档独立创建**自己的chunk信号量:
74
+
75
+ ```python
76
+ # lightrag/operate.py - extract_entities函数
77
+ async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...):
78
+ # 🔥 关键:每个文档都会独立创建这个信号量!
79
+ llm_model_max_async = global_config.get("llm_model_max_async", 4)
80
+ semaphore = asyncio.Semaphore(llm_model_max_async) # 每个文档的chunk信号量
81
+
82
+ async def _process_with_semaphore(chunk):
83
+ async with semaphore: # 🔥 文档内部的chunk并发控制
84
+ return await _process_single_content(chunk)
85
+
86
+ # 为每个chunk创建任务
87
+ tasks = []
88
+ for c in ordered_chunks:
89
+ task = asyncio.create_task(_process_with_semaphore(c))
90
+ tasks.append(task)
91
+
92
+ # 等待所有chunk处理完成
93
+ done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
94
+ chunk_results = [task.result() for task in tasks]
95
+ return chunk_results
96
+ ```
97
+
98
+ ### 重要推论:系统整体Chunk并发数
99
+
100
+ 由于每个文档独立创建chunk信号量,系统理论上的chunk并发数是:
101
+
102
+ **理论Chunk并发数 = max_parallel_insert × llm_model_max_async**
103
+
104
+ 例如:
105
+ - `max_parallel_insert = 2`(同时处理2个文档)
106
+ - `llm_model_max_async = 4`(每个文档最多4个chunk并发)
107
+ - 理论结果:最多 2 × 4 = 8个chunk同时处于"处理中"状态
108
+
109
+ ## 3. LLM请求级别并发控制(真正的瓶颈)
110
+
111
+ **控制参数**:`llm_model_max_async`(全局共享)
112
+
113
+ **关键**:尽管可能有8个chunk在"处理中",但所有LLM请求共享同一个全局优先级队列!
114
+
115
+ ```python
116
+ # lightrag/lightrag.py - __post_init__方法
117
+ self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)(
118
+ partial(
119
+ self.llm_model_func,
120
+ hashing_kv=hashing_kv,
121
+ **self.llm_model_kwargs,
122
+ )
123
+ )
124
+ # 🔥 全局LLM队列大小 = llm_model_max_async = 4
125
+ ```
126
+
127
+ ### 优先级队列实现
128
+
129
+ ```python
130
+ # lightrag/utils.py - priority_limit_async_func_call函数
131
+ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
132
+ def final_decro(func):
133
+ queue = asyncio.PriorityQueue(maxsize=max_queue_size)
134
+ tasks = set()
135
+
136
+ async def worker():
137
+ """Worker that processes tasks in the priority queue"""
138
+ while not shutdown_event.is_set():
139
+ try:
140
+ priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0)
141
+ result = await func(*args, **kwargs) # 🔥 实际LLM调用
142
+ if not future.done():
143
+ future.set_result(result)
144
+ except Exception as e:
145
+ # 错误处理...
146
+ finally:
147
+ queue.task_done()
148
+
149
+ # 🔥 创建固定数量的worker(max_size个),这是真正的并发限制
150
+ for _ in range(max_size):
151
+ task = asyncio.create_task(worker())
152
+ tasks.add(task)
153
+ ```
154
+
155
+ ## 4. Chunk内部处理机制(串行)
156
+
157
+ ### 为什么是串行?
158
+
159
+ 每个chunk内部的处理严格按照以下顺序串行执行:
160
+
161
+ ```python
162
+ # lightrag/operate.py - _process_single_content函数
163
+ async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
164
+ # 步骤1:初始实体提取
165
+ hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content})
166
+ final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...)
167
+
168
+ # 处理初始提取结果
169
+ maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path)
170
+
171
+ # 步骤2:Gleaning(深挖)阶段
172
+ for now_glean_index in range(entity_extract_max_gleaning):
173
+ # 🔥 串行等待gleaning结果
174
+ glean_result = await use_llm_func_with_cache(
175
+ continue_prompt, use_llm_func,
176
+ llm_response_cache=llm_response_cache,
177
+ history_messages=history, cache_type="extract"
178
+ )
179
+
180
+ # 处理gleaning结果
181
+ glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path)
182
+
183
+ # 合并结果...
184
+
185
+ # 步骤3:判断是否继续循环
186
+ if now_glean_index == entity_extract_max_gleaning - 1:
187
+ break
188
+
189
+ # 🔥 串行等待循环判断结果
190
+ if_loop_result = await use_llm_func_with_cache(
191
+ if_loop_prompt, use_llm_func,
192
+ llm_response_cache=llm_response_cache,
193
+ history_messages=history, cache_type="extract"
194
+ )
195
+
196
+ if if_loop_result.strip().strip('"').strip("'").lower() != "yes":
197
+ break
198
+
199
+ return maybe_nodes, maybe_edges
200
+ ```
201
+
202
+ ## 5. 完整的并发层次图
203
+ ![lightrag_indexing.png](..%2Fassets%2Flightrag_indexing.png)
204
+
205
+
206
+ ## 6. 实际运行场景分析
207
+
208
+ ### 场景1:单文档多Chunk
209
+ 假设有1个文档,包含6个chunks:
210
+
211
+ - 文档级别:只有1个文档,不受 `max_parallel_insert` 限制
212
+ - Chunk级别:最多4个chunks同时处理(受 `llm_model_max_async=4` 限制)
213
+ - LLM级别:全局最多4个LLM请求并发
214
+
215
+ **预期行为**:4个chunks并发处理,剩余2个chunks等待。
216
+
217
+ ### 场景2:多文档多Chunk
218
+ 假设有3个文档,每个文档包含10个chunks:
219
+
220
+ - 文档级别:最多2个文档同时处理
221
+ - Chunk级别:每个文档最多4个chunks同时处理
222
+ - 理论Chunk并发:2 × 4 = 8个chunks同时处理
223
+ - 实际LLM并发:只有4个LLM请求真正执行
224
+
225
+ **实际状态分布**:
226
+ ```
227
+ # 可能的系统状态:
228
+ 文档1: 4个chunks"处理中"(其中2个在执行LLM,2个在等待LLM响应)
229
+ 文档2: 4个chunks"处理中"(其中2个在执行LLM,2个在等待LLM响应)
230
+ 文档3: 等待文档级别信号量
231
+
232
+ 总计:
233
+ - 8个chunks处于"处理中"状态
234
+ - 4个LLM请求真正执行
235
+ - 4个chunks等待LLM响应
236
+ ```
237
+
238
+ ## 7. 性能优化建议
239
+
240
+ ### 理解瓶颈
241
+
242
+ **真正的瓶颈是全局LLM队列,而不是chunk信号量!**
243
+
244
+ ### 调整策略
245
+
246
+ **策略1:提高LLM并发能力**
247
+
248
+ ```bash
249
+ # 环境变量配置
250
+ export MAX_PARALLEL_INSERT=2 # 保持文档并发
251
+ export MAX_ASYNC=8 # 🔥 增加LLM请求并发数
252
+ ```
253
+
254
+ **策略2:平衡文档和LLM并发**
255
+
256
+ ```python
257
+ rag = LightRAG(
258
+ max_parallel_insert=3, # 适度增加文档并发
259
+ llm_model_max_async=12, # 大幅增加LLM并发
260
+ entity_extract_max_gleaning=0, # 减少chunk内串行步骤
261
+ )
262
+ ```
263
+
264
+ ## 8. 总结
265
+
266
+ LightRAG的多文档并发处理机制的关键特点:
267
+
268
+ ### 并发层次
269
+ 1. **文档间争抢**:受 `max_parallel_insert` 控制,默认2个文档并发
270
+ 2. **理论Chunk并发**:每个文档独立创建信号量,总数 = `max_parallel_insert × llm_model_max_async`
271
+ 3. **实际LLM并发**:所有chunk共享全局LLM队列,受 `llm_model_max_async` 控制
272
+ 4. **单Chunk内串行**:每个chunk内的多个LLM请求严格串行执行
273
+
274
+ ### 关键洞察
275
+ - **理论vs实际**:系统可能有很多chunk在"处理中",但只有少数在真正执行LLM请求
276
+ - **真正瓶颈**:全局LLM请求队列是性能瓶颈,而不是chunk信号量
277
+ - **优化重点**:提高 `llm_model_max_async` 比增加 `max_parallel_insert` 更有效
lightrag/lightrag.py CHANGED
@@ -1006,7 +1006,7 @@ class LightRAG:
1006
  except Exception as e:
1007
  # Log error and update pipeline status
1008
  logger.error(traceback.format_exc())
1009
- error_msg = f"Failed to extrat document {current_file_number}/{total_files}: {file_path}"
1010
  logger.error(error_msg)
1011
  async with pipeline_status_lock:
1012
  pipeline_status["latest_message"] = error_msg
 
1006
  except Exception as e:
1007
  # Log error and update pipeline status
1008
  logger.error(traceback.format_exc())
1009
+ error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
1010
  logger.error(error_msg)
1011
  async with pipeline_status_lock:
1012
  pipeline_status["latest_message"] = error_msg