YanSte commited on
Commit
3f9866f
·
1 Parent(s): 4be600c

make more clear

Browse files
Files changed (1) hide show
  1. lightrag/lightrag.py +39 -36
lightrag/lightrag.py CHANGED
@@ -509,6 +509,26 @@ class LightRAG:
509
  await self.doc_status.upsert(new_docs)
510
  logger.info(f"Stored {len(new_docs)} new unique documents")
511
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
512
  async def apipeline_process_chunks(
513
  self,
514
  split_by_character: str | None = None,
@@ -527,52 +547,36 @@ class LightRAG:
527
  when split_by_character is None, this parameter is ignored.
528
  """
529
  # 1. get all pending and failed documents
530
- to_process_doc_keys: list[str] = []
531
-
532
- # Process failes
533
- to_process_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED)
534
- if to_process_docs:
535
- to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
536
-
537
- # Process Pending
538
- to_process_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING)
539
- if to_process_docs:
540
- to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
541
-
542
- if not to_process_doc_keys:
543
- logger.info("All documents have been processed or are duplicates")
544
- return
545
-
546
- # If included in text_chunks is all processed, return
547
- new_docs = await self.doc_status.get_by_ids(to_process_doc_keys)
548
- text_chunks_new_docs_ids = await self.text_chunks.filter_keys(
549
- to_process_doc_keys
550
- )
551
- full_docs_new_docs_ids = await self.full_docs.filter_keys(to_process_doc_keys)
552
-
553
- if not new_docs:
554
  logger.info("All documents have been processed or are duplicates")
555
  return
 
 
 
 
556
 
557
  # 2. split docs into chunks, insert chunks, update doc status
558
  batch_size = self.addon_params.get("insert_batch_size", 10)
559
  batch_docs_list = [
560
- new_docs[i : i + batch_size] for i in range(0, len(new_docs), batch_size)
561
  ]
562
- for i, el in enumerate(batch_docs_list):
563
- items = ((k, v) for d in el for k, v in d.items())
564
-
565
- tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
566
-
567
  doc_status: dict[str, Any] = {
568
  "status": DocStatus.PROCESSING,
569
  "updated_at": datetime.now().isoformat(),
570
  }
571
 
572
- for doc_id, doc in tqdm_async(
573
- items,
574
- desc=f"Level 1 - Spliting doc in batch {i // len(batch_docs_list) + 1}",
575
  ):
 
576
  doc_status.update(
577
  {
578
  "content_summary": doc["content_summary"],
@@ -580,7 +584,6 @@ class LightRAG:
580
  "created_at": doc["created_at"],
581
  }
582
  )
583
-
584
  await self.doc_status.upsert({doc_id: doc_status})
585
 
586
  # Generate chunks from document
@@ -614,12 +617,12 @@ class LightRAG:
614
  )
615
  await self.doc_status.upsert({doc_id: doc_status})
616
 
617
- if doc_id not in full_docs_new_docs_ids:
618
  tasks[doc_id].append(
619
  self.full_docs.upsert({doc_id: {"content": doc["content"]}})
620
  )
621
 
622
- if doc_id not in text_chunks_new_docs_ids:
623
  tasks[doc_id].append(self.text_chunks.upsert(chunks))
624
 
625
  for doc_id, task in tasks.items():
 
509
  await self.doc_status.upsert(new_docs)
510
  logger.info(f"Stored {len(new_docs)} new unique documents")
511
 
512
+ async def _get_pending_documents(self) -> list[str]:
513
+ """Fetch all pending and failed documents."""
514
+ to_process_doc_keys: list[str] = []
515
+
516
+ # Fetch failed documents
517
+ failed_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED)
518
+ if failed_docs:
519
+ to_process_doc_keys.extend([doc["id"] for doc in failed_docs])
520
+
521
+ # Fetch pending documents
522
+ pending_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING)
523
+ if pending_docs:
524
+ to_process_doc_keys.extend([doc["id"] for doc in pending_docs])
525
+
526
+ if not to_process_doc_keys:
527
+ logger.info("All documents have been processed or are duplicates")
528
+ return []
529
+
530
+ return to_process_doc_keys
531
+
532
  async def apipeline_process_chunks(
533
  self,
534
  split_by_character: str | None = None,
 
547
  when split_by_character is None, this parameter is ignored.
548
  """
549
  # 1. get all pending and failed documents
550
+ pending_doc_ids = await self._get_pending_documents()
551
+
552
+ if not pending_doc_ids:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
553
  logger.info("All documents have been processed or are duplicates")
554
  return
555
+
556
+ # Get allready processed documents (text chunks and full docs)
557
+ text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(pending_doc_ids)
558
+ full_docs_processed_doc_ids = await self.full_docs.filter_keys(pending_doc_ids)
559
 
560
  # 2. split docs into chunks, insert chunks, update doc status
561
  batch_size = self.addon_params.get("insert_batch_size", 10)
562
  batch_docs_list = [
563
+ pending_doc_ids[i : i + batch_size] for i in range(0, len(pending_doc_ids), batch_size)
564
  ]
565
+ batch_len = len(batch_docs_list) + 1
566
+ # 3. iterate over batches
567
+ tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
568
+ for batch_idx, doc_ids in enumerate(batch_docs_list):
569
+
570
  doc_status: dict[str, Any] = {
571
  "status": DocStatus.PROCESSING,
572
  "updated_at": datetime.now().isoformat(),
573
  }
574
 
575
+ for doc_id in tqdm_async(
576
+ doc_ids,
577
+ desc=f"Level 1 - Batch {batch_idx} / {batch_len}",
578
  ):
579
+ doc = await self.doc_status.get_by_id(doc_id)
580
  doc_status.update(
581
  {
582
  "content_summary": doc["content_summary"],
 
584
  "created_at": doc["created_at"],
585
  }
586
  )
 
587
  await self.doc_status.upsert({doc_id: doc_status})
588
 
589
  # Generate chunks from document
 
617
  )
618
  await self.doc_status.upsert({doc_id: doc_status})
619
 
620
+ if doc_id not in full_docs_processed_doc_ids:
621
  tasks[doc_id].append(
622
  self.full_docs.upsert({doc_id: {"content": doc["content"]}})
623
  )
624
 
625
+ if doc_id not in text_chunks_processed_doc_ids:
626
  tasks[doc_id].append(self.text_chunks.upsert(chunks))
627
 
628
  for doc_id, task in tasks.items():