YanSte commited on
Commit
3871323
·
1 Parent(s): 7abc8b2

improved get status

Browse files
Files changed (1) hide show
  1. lightrag/lightrag.py +43 -53
lightrag/lightrag.py CHANGED
@@ -13,7 +13,6 @@ from .operate import (
13
  kg_query_with_keywords,
14
  mix_kg_vector_query,
15
  naive_query,
16
- # local_query,global_query,hybrid_query,,
17
  )
18
 
19
  from .utils import (
@@ -28,6 +27,7 @@ from .base import (
28
  BaseGraphStorage,
29
  BaseKVStorage,
30
  BaseVectorStorage,
 
31
  DocStatus,
32
  DocStatusStorage,
33
  QueryParam,
@@ -396,7 +396,9 @@ class LightRAG:
396
  split_by_character is None, this parameter is ignored.
397
  """
398
  await self.apipeline_enqueue_documents(string_or_strings)
399
- await self.apipeline_process_enqueue_documents(split_by_character, split_by_character_only)
 
 
400
 
401
  def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
402
  loop = always_get_an_event_loop()
@@ -468,12 +470,12 @@ class LightRAG:
468
  async def apipeline_enqueue_documents(self, string_or_strings: str | list[str]):
469
  """
470
  Pipeline for Processing Documents
471
-
472
  1. Remove duplicate contents from the list
473
  2. Generate document IDs and initial status
474
  3. Filter out already processed documents
475
- 4. Enqueue document in status
476
- """
477
  if isinstance(string_or_strings, str):
478
  string_or_strings = [string_or_strings]
479
 
@@ -512,26 +514,6 @@ class LightRAG:
512
  await self.doc_status.upsert(new_docs)
513
  logger.info(f"Stored {len(new_docs)} new unique documents")
514
 
515
- async def _get_pending_documents(self) -> list[str]:
516
- """Fetch all pending and failed documents."""
517
- to_process_doc_keys: list[str] = []
518
-
519
- # Fetch failed documents
520
- failed_docs = await self.doc_status.get_failed_docs()
521
- if failed_docs:
522
- to_process_doc_keys.extend([doc["id"] for doc in failed_docs])
523
-
524
- # Fetch pending documents
525
- pending_docs = await self.doc_status.get_pending_docs()
526
- if pending_docs:
527
- to_process_doc_keys.extend([doc["id"] for doc in pending_docs])
528
-
529
- if not to_process_doc_keys:
530
- logger.info("All documents have been processed or are duplicates")
531
- return []
532
-
533
- return to_process_doc_keys
534
-
535
  async def apipeline_process_enqueue_documents(
536
  self,
537
  split_by_character: str | None = None,
@@ -548,46 +530,53 @@ class LightRAG:
548
  4. Update the document status
549
  """
550
  # 1. get all pending and failed documents
551
- pending_doc_ids = await self._get_pending_documents()
552
 
553
- if not pending_doc_ids:
 
 
 
 
 
 
 
554
  logger.info("All documents have been processed or are duplicates")
555
- return
556
 
 
557
  # Get allready processed documents (text chunks and full docs)
558
- text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(
559
- pending_doc_ids
560
- )
561
- full_docs_processed_doc_ids = await self.full_docs.filter_keys(pending_doc_ids)
562
 
563
  # 2. split docs into chunks, insert chunks, update doc status
564
  batch_size = self.addon_params.get("insert_batch_size", 10)
565
  batch_docs_list = [
566
- pending_doc_ids[i : i + batch_size]
567
- for i in range(0, len(pending_doc_ids), batch_size)
568
  ]
569
 
570
  # 3. iterate over batches
571
  tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
572
- for batch_idx, doc_ids in tqdm_async(
573
  enumerate(batch_docs_list),
574
  desc="Process Batches",
575
  ):
576
  # 4. iterate over batch
577
- for doc_id in tqdm_async(
578
- doc_ids,
579
  desc=f"Process Batch {batch_idx}",
580
  ):
581
  # Update status in processing
582
- status_doc = await self.doc_status.get_by_id(doc_id)
 
583
  await self.doc_status.upsert(
584
  {
585
- doc_id: {
586
  "status": DocStatus.PROCESSING,
587
  "updated_at": datetime.now().isoformat(),
588
- "content_summary": status_doc["content_summary"],
589
- "content_length": status_doc["content_length"],
590
- "created_at": status_doc["created_at"],
591
  }
592
  }
593
  )
@@ -595,10 +584,10 @@ class LightRAG:
595
  chunks: dict[str, Any] = {
596
  compute_mdhash_id(dp["content"], prefix="chunk-"): {
597
  **dp,
598
- "full_doc_id": doc_id,
599
  }
600
  for dp in self.chunking_func(
601
- status_doc["content"],
602
  split_by_character,
603
  split_by_character_only,
604
  self.chunk_overlap_token_size,
@@ -611,25 +600,26 @@ class LightRAG:
611
  await self._process_entity_relation_graph(chunks)
612
  await self.chunks_vdb.upsert(chunks)
613
 
 
614
  # Check if document already processed the doc
615
- if doc_id not in full_docs_processed_doc_ids:
616
- tasks[doc_id].append(
617
  self.full_docs.upsert(
618
- {doc_id: {"content": status_doc["content"]}}
619
  )
620
  )
621
 
622
  # Check if chunks already processed the doc
623
- if doc_id not in text_chunks_processed_doc_ids:
624
- tasks[doc_id].append(self.text_chunks.upsert(chunks))
625
 
626
  # Process document (text chunks and full docs) in parallel
627
- for doc_id, task in tasks.items():
628
  try:
629
  await asyncio.gather(*task)
630
  await self.doc_status.upsert(
631
  {
632
- doc_id: {
633
  "status": DocStatus.PROCESSED,
634
  "chunks_count": len(chunks),
635
  "updated_at": datetime.now().isoformat(),
@@ -639,10 +629,10 @@ class LightRAG:
639
  await self._insert_done()
640
 
641
  except Exception as e:
642
- logger.error(f"Failed to process document {doc_id}: {str(e)}")
643
  await self.doc_status.upsert(
644
  {
645
- doc_id: {
646
  "status": DocStatus.FAILED,
647
  "error": str(e),
648
  "updated_at": datetime.now().isoformat(),
 
13
  kg_query_with_keywords,
14
  mix_kg_vector_query,
15
  naive_query,
 
16
  )
17
 
18
  from .utils import (
 
27
  BaseGraphStorage,
28
  BaseKVStorage,
29
  BaseVectorStorage,
30
+ DocProcessingStatus,
31
  DocStatus,
32
  DocStatusStorage,
33
  QueryParam,
 
396
  split_by_character is None, this parameter is ignored.
397
  """
398
  await self.apipeline_enqueue_documents(string_or_strings)
399
+ await self.apipeline_process_enqueue_documents(
400
+ split_by_character, split_by_character_only
401
+ )
402
 
403
  def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
404
  loop = always_get_an_event_loop()
 
470
  async def apipeline_enqueue_documents(self, string_or_strings: str | list[str]):
471
  """
472
  Pipeline for Processing Documents
473
+
474
  1. Remove duplicate contents from the list
475
  2. Generate document IDs and initial status
476
  3. Filter out already processed documents
477
+ 4. Enqueue document in status
478
+ """
479
  if isinstance(string_or_strings, str):
480
  string_or_strings = [string_or_strings]
481
 
 
514
  await self.doc_status.upsert(new_docs)
515
  logger.info(f"Stored {len(new_docs)} new unique documents")
516
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
517
  async def apipeline_process_enqueue_documents(
518
  self,
519
  split_by_character: str | None = None,
 
530
  4. Update the document status
531
  """
532
  # 1. get all pending and failed documents
533
+ to_process_docs: dict[str, DocProcessingStatus] = {}
534
 
535
+ # Fetch failed documents
536
+ failed_docs = await self.doc_status.get_failed_docs()
537
+ to_process_docs.update(failed_docs)
538
+
539
+ pending_docs = await self.doc_status.get_pending_docs()
540
+ to_process_docs.update(pending_docs)
541
+
542
+ if not to_process_docs:
543
  logger.info("All documents have been processed or are duplicates")
544
+ return
545
 
546
+ to_process_docs_ids = list(to_process_docs.keys())
547
  # Get allready processed documents (text chunks and full docs)
548
+ text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(to_process_docs_ids)
549
+ full_docs_processed_doc_ids = await self.full_docs.filter_keys(to_process_docs_ids)
 
 
550
 
551
  # 2. split docs into chunks, insert chunks, update doc status
552
  batch_size = self.addon_params.get("insert_batch_size", 10)
553
  batch_docs_list = [
554
+ list(to_process_docs.items())[i : i + batch_size]
555
+ for i in range(0, len(to_process_docs), batch_size)
556
  ]
557
 
558
  # 3. iterate over batches
559
  tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
560
+ for batch_idx, ids_doc_processing_status in tqdm_async(
561
  enumerate(batch_docs_list),
562
  desc="Process Batches",
563
  ):
564
  # 4. iterate over batch
565
+ for id_doc_processing_status in tqdm_async(
566
+ ids_doc_processing_status,
567
  desc=f"Process Batch {batch_idx}",
568
  ):
569
  # Update status in processing
570
+ id_doc, status_doc = id_doc_processing_status
571
+
572
  await self.doc_status.upsert(
573
  {
574
+ id_doc: {
575
  "status": DocStatus.PROCESSING,
576
  "updated_at": datetime.now().isoformat(),
577
+ "content_summary": status_doc.content_summary,
578
+ "content_length": status_doc.content_length,
579
+ "created_at": status_doc.created_at,
580
  }
581
  }
582
  )
 
584
  chunks: dict[str, Any] = {
585
  compute_mdhash_id(dp["content"], prefix="chunk-"): {
586
  **dp,
587
+ "full_doc_id": id_doc_processing_status,
588
  }
589
  for dp in self.chunking_func(
590
+ status_doc.content,
591
  split_by_character,
592
  split_by_character_only,
593
  self.chunk_overlap_token_size,
 
600
  await self._process_entity_relation_graph(chunks)
601
  await self.chunks_vdb.upsert(chunks)
602
 
603
+ tasks[id_doc] = []
604
  # Check if document already processed the doc
605
+ if id_doc not in full_docs_processed_doc_ids:
606
+ tasks[id_doc].append(
607
  self.full_docs.upsert(
608
+ {id_doc: {"content": status_doc.content}}
609
  )
610
  )
611
 
612
  # Check if chunks already processed the doc
613
+ if id_doc not in text_chunks_processed_doc_ids:
614
+ tasks[id_doc].append(self.text_chunks.upsert(chunks))
615
 
616
  # Process document (text chunks and full docs) in parallel
617
+ for id_doc_processing_status, task in tasks.items():
618
  try:
619
  await asyncio.gather(*task)
620
  await self.doc_status.upsert(
621
  {
622
+ id_doc_processing_status: {
623
  "status": DocStatus.PROCESSED,
624
  "chunks_count": len(chunks),
625
  "updated_at": datetime.now().isoformat(),
 
629
  await self._insert_done()
630
 
631
  except Exception as e:
632
+ logger.error(f"Failed to process document {id_doc_processing_status}: {str(e)}")
633
  await self.doc_status.upsert(
634
  {
635
+ id_doc_processing_status: {
636
  "status": DocStatus.FAILED,
637
  "error": str(e),
638
  "updated_at": datetime.now().isoformat(),