Daniel.y commited on
Commit
ed2ddf6
·
unverified ·
2 Parent(s): 3ab6fa0 e7935cf

Merge pull request #1720 from danielaskdd/add-rebuild-pipeline-status

Browse files
Files changed (2) hide show
  1. lightrag/lightrag.py +2 -6
  2. lightrag/operate.py +56 -12
lightrag/lightrag.py CHANGED
@@ -1943,14 +1943,10 @@ class LightRAG:
1943
  text_chunks=self.text_chunks,
1944
  llm_response_cache=self.llm_response_cache,
1945
  global_config=asdict(self),
 
 
1946
  )
1947
 
1948
- async with pipeline_status_lock:
1949
- log_message = f"Successfully rebuilt {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relations"
1950
- logger.info(log_message)
1951
- pipeline_status["latest_message"] = log_message
1952
- pipeline_status["history_messages"].append(log_message)
1953
-
1954
  except Exception as e:
1955
  logger.error(f"Failed to rebuild knowledge from chunks: {e}")
1956
  raise Exception(
 
1943
  text_chunks=self.text_chunks,
1944
  llm_response_cache=self.llm_response_cache,
1945
  global_config=asdict(self),
1946
+ pipeline_status=pipeline_status,
1947
+ pipeline_status_lock=pipeline_status_lock,
1948
  )
1949
 
 
 
 
 
 
 
1950
  except Exception as e:
1951
  logger.error(f"Failed to rebuild knowledge from chunks: {e}")
1952
  raise Exception(
lightrag/operate.py CHANGED
@@ -250,6 +250,8 @@ async def _rebuild_knowledge_from_chunks(
250
  text_chunks: BaseKVStorage,
251
  llm_response_cache: BaseKVStorage,
252
  global_config: dict[str, str],
 
 
253
  ) -> None:
254
  """Rebuild entity and relationship descriptions from cached extraction results
255
 
@@ -262,6 +264,8 @@ async def _rebuild_knowledge_from_chunks(
262
  """
263
  if not entities_to_rebuild and not relationships_to_rebuild:
264
  return
 
 
265
 
266
  # Get all referenced chunk IDs
267
  all_referenced_chunk_ids = set()
@@ -270,9 +274,12 @@ async def _rebuild_knowledge_from_chunks(
270
  for chunk_ids in relationships_to_rebuild.values():
271
  all_referenced_chunk_ids.update(chunk_ids)
272
 
273
- logger.debug(
274
- f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions"
275
- )
 
 
 
276
 
277
  # Get cached extraction results for these chunks
278
  cached_results = await _get_cached_extraction_results(
@@ -280,7 +287,12 @@ async def _rebuild_knowledge_from_chunks(
280
  )
281
 
282
  if not cached_results:
283
- logger.warning("No cached extraction results found, cannot rebuild")
 
 
 
 
 
284
  return
285
 
286
  # Process cached results to get entities and relationships for each chunk
@@ -297,9 +309,14 @@ async def _rebuild_knowledge_from_chunks(
297
  chunk_entities[chunk_id] = entities
298
  chunk_relationships[chunk_id] = relationships
299
  except Exception as e:
300
- logger.error(
301
  f"Failed to parse cached extraction result for chunk {chunk_id}: {e}"
302
  )
 
 
 
 
 
303
  continue
304
 
305
  # Rebuild entities
@@ -314,11 +331,22 @@ async def _rebuild_knowledge_from_chunks(
314
  llm_response_cache=llm_response_cache,
315
  global_config=global_config,
316
  )
317
- logger.debug(
318
- f"Rebuilt entity {entity_name} from {len(chunk_ids)} cached extractions"
 
319
  )
 
 
 
 
 
320
  except Exception as e:
321
- logger.error(f"Failed to rebuild entity {entity_name}: {e}")
 
 
 
 
 
322
 
323
  # Rebuild relationships
324
  for (src, tgt), chunk_ids in relationships_to_rebuild.items():
@@ -333,13 +361,29 @@ async def _rebuild_knowledge_from_chunks(
333
  llm_response_cache=llm_response_cache,
334
  global_config=global_config,
335
  )
336
- logger.debug(
337
- f"Rebuilt relationship {src}-{tgt} from {len(chunk_ids)} cached extractions"
 
338
  )
 
 
 
 
 
339
  except Exception as e:
340
- logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}")
 
 
 
 
 
341
 
342
- logger.debug("Completed rebuilding knowledge from cached extractions")
 
 
 
 
 
343
 
344
 
345
  async def _get_cached_extraction_results(
 
250
  text_chunks: BaseKVStorage,
251
  llm_response_cache: BaseKVStorage,
252
  global_config: dict[str, str],
253
+ pipeline_status: dict | None = None,
254
+ pipeline_status_lock=None,
255
  ) -> None:
256
  """Rebuild entity and relationship descriptions from cached extraction results
257
 
 
264
  """
265
  if not entities_to_rebuild and not relationships_to_rebuild:
266
  return
267
+ rebuilt_entities_count = 0
268
+ rebuilt_relationships_count = 0
269
 
270
  # Get all referenced chunk IDs
271
  all_referenced_chunk_ids = set()
 
274
  for chunk_ids in relationships_to_rebuild.values():
275
  all_referenced_chunk_ids.update(chunk_ids)
276
 
277
+ status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions"
278
+ logger.info(status_message)
279
+ if pipeline_status is not None and pipeline_status_lock is not None:
280
+ async with pipeline_status_lock:
281
+ pipeline_status["latest_message"] = status_message
282
+ pipeline_status["history_messages"].append(status_message)
283
 
284
  # Get cached extraction results for these chunks
285
  cached_results = await _get_cached_extraction_results(
 
287
  )
288
 
289
  if not cached_results:
290
+ status_message = "No cached extraction results found, cannot rebuild"
291
+ logger.warning(status_message)
292
+ if pipeline_status is not None and pipeline_status_lock is not None:
293
+ async with pipeline_status_lock:
294
+ pipeline_status["latest_message"] = status_message
295
+ pipeline_status["history_messages"].append(status_message)
296
  return
297
 
298
  # Process cached results to get entities and relationships for each chunk
 
309
  chunk_entities[chunk_id] = entities
310
  chunk_relationships[chunk_id] = relationships
311
  except Exception as e:
312
+ status_message = (
313
  f"Failed to parse cached extraction result for chunk {chunk_id}: {e}"
314
  )
315
+ logger.info(status_message) # Per requirement, change to info
316
+ if pipeline_status is not None and pipeline_status_lock is not None:
317
+ async with pipeline_status_lock:
318
+ pipeline_status["latest_message"] = status_message
319
+ pipeline_status["history_messages"].append(status_message)
320
  continue
321
 
322
  # Rebuild entities
 
331
  llm_response_cache=llm_response_cache,
332
  global_config=global_config,
333
  )
334
+ rebuilt_entities_count += 1
335
+ status_message = (
336
+ f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
337
  )
338
+ logger.info(status_message)
339
+ if pipeline_status is not None and pipeline_status_lock is not None:
340
+ async with pipeline_status_lock:
341
+ pipeline_status["latest_message"] = status_message
342
+ pipeline_status["history_messages"].append(status_message)
343
  except Exception as e:
344
+ status_message = f"Failed to rebuild entity {entity_name}: {e}"
345
+ logger.info(status_message) # Per requirement, change to info
346
+ if pipeline_status is not None and pipeline_status_lock is not None:
347
+ async with pipeline_status_lock:
348
+ pipeline_status["latest_message"] = status_message
349
+ pipeline_status["history_messages"].append(status_message)
350
 
351
  # Rebuild relationships
352
  for (src, tgt), chunk_ids in relationships_to_rebuild.items():
 
361
  llm_response_cache=llm_response_cache,
362
  global_config=global_config,
363
  )
364
+ rebuilt_relationships_count += 1
365
+ status_message = (
366
+ f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
367
  )
368
+ logger.info(status_message)
369
+ if pipeline_status is not None and pipeline_status_lock is not None:
370
+ async with pipeline_status_lock:
371
+ pipeline_status["latest_message"] = status_message
372
+ pipeline_status["history_messages"].append(status_message)
373
  except Exception as e:
374
+ status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
375
+ logger.info(status_message)
376
+ if pipeline_status is not None and pipeline_status_lock is not None:
377
+ async with pipeline_status_lock:
378
+ pipeline_status["latest_message"] = status_message
379
+ pipeline_status["history_messages"].append(status_message)
380
 
381
+ status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships."
382
+ logger.info(status_message)
383
+ if pipeline_status is not None and pipeline_status_lock is not None:
384
+ async with pipeline_status_lock:
385
+ pipeline_status["latest_message"] = status_message
386
+ pipeline_status["history_messages"].append(status_message)
387
 
388
 
389
  async def _get_cached_extraction_results(