YanSte commited on
Commit
797f88c
·
1 Parent(s): e4307a4

cleaned docs

Browse files
examples/lightrag_oracle_demo.py CHANGED
@@ -121,9 +121,8 @@ async def main():
121
  texts = [x for x in all_text.split("\n") if x]
122
 
123
  # New mode use pipeline
124
- await rag.apipeline_process_documents(texts)
125
- await rag.apipeline_process_chunks()
126
- await rag.apipeline_process_extract_graph()
127
 
128
  # Old method use ainsert
129
  # await rag.ainsert(texts)
 
121
  texts = [x for x in all_text.split("\n") if x]
122
 
123
  # New mode use pipeline
124
+ await rag.apipeline_enqueue_documents(texts)
125
+ await rag.apipeline_process_enqueue_documents()
 
126
 
127
  # Old method use ainsert
128
  # await rag.ainsert(texts)
lightrag/lightrag.py CHANGED
@@ -395,7 +395,9 @@ class LightRAG:
395
  split_by_character is None, this parameter is ignored.
396
  """
397
  await self.apipeline_process_documents(string_or_strings)
398
- await self.apipeline_process_enqueue_documents(split_by_character, split_by_character_only)
 
 
399
 
400
  def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
401
  loop = always_get_an_event_loop()
@@ -511,23 +513,23 @@ class LightRAG:
511
  async def _get_pending_documents(self) -> list[str]:
512
  """Fetch all pending and failed documents."""
513
  to_process_doc_keys: list[str] = []
514
-
515
  # Fetch failed documents
516
  failed_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED)
517
  if failed_docs:
518
  to_process_doc_keys.extend([doc["id"] for doc in failed_docs])
519
-
520
  # Fetch pending documents
521
  pending_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING)
522
  if pending_docs:
523
  to_process_doc_keys.extend([doc["id"] for doc in pending_docs])
524
-
525
  if not to_process_doc_keys:
526
  logger.info("All documents have been processed or are duplicates")
527
  return []
528
-
529
  return to_process_doc_keys
530
-
531
  async def apipeline_process_enqueue_documents(
532
  self,
533
  split_by_character: str | None = None,
@@ -535,36 +537,39 @@ class LightRAG:
535
  ) -> None:
536
  """
537
  Process pending documents by splitting them into chunks, processing
538
- each chunk for entity and relation extraction, and updating the
539
  document status.
540
-
541
  1. Get all pending and failed documents
542
  2. Split document content into chunks
543
  3. Process each chunk for entity and relation extraction
544
  4. Update the document status
545
- """
546
  # 1. get all pending and failed documents
547
  pending_doc_ids = await self._get_pending_documents()
548
-
549
  if not pending_doc_ids:
550
  logger.info("All documents have been processed or are duplicates")
551
  return
552
-
553
  # Get allready processed documents (text chunks and full docs)
554
- text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(pending_doc_ids)
 
 
555
  full_docs_processed_doc_ids = await self.full_docs.filter_keys(pending_doc_ids)
556
 
557
  # 2. split docs into chunks, insert chunks, update doc status
558
  batch_size = self.addon_params.get("insert_batch_size", 10)
559
  batch_docs_list = [
560
- pending_doc_ids[i : i + batch_size] for i in range(0, len(pending_doc_ids), batch_size)
 
561
  ]
562
-
563
  # 3. iterate over batches
564
  tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
565
  for batch_idx, doc_ids in tqdm_async(
566
  enumerate(batch_docs_list),
567
- desc=f"Process Batches",
568
  ):
569
  # 4. iterate over batch
570
  for doc_id in tqdm_async(
@@ -580,7 +585,7 @@ class LightRAG:
580
  "updated_at": datetime.now().isoformat(),
581
  "content_summary": status_doc["content_summary"],
582
  "content_length": status_doc["content_length"],
583
- "created_at": status_doc["created_at"],
584
  }
585
  }
586
  )
@@ -599,22 +604,24 @@ class LightRAG:
599
  self.tiktoken_model_name,
600
  )
601
  }
602
-
603
- # Ensure chunk insertion and graph processing happen sequentially, not in parallel
604
  await self._process_entity_relation_graph(chunks)
605
  await self.chunks_vdb.upsert(chunks)
606
 
607
  # Check if document already processed the doc
608
  if doc_id not in full_docs_processed_doc_ids:
609
  tasks[doc_id].append(
610
- self.full_docs.upsert({doc_id: {"content": status_doc["content"]}})
 
 
611
  )
612
-
613
  # Check if chunks already processed the doc
614
  if doc_id not in text_chunks_processed_doc_ids:
615
  tasks[doc_id].append(self.text_chunks.upsert(chunks))
616
 
617
- # Process document (text chunks and full docs) in parallel
618
  for doc_id, task in tasks.items():
619
  try:
620
  await asyncio.gather(*task)
@@ -630,9 +637,7 @@ class LightRAG:
630
  await self._insert_done()
631
 
632
  except Exception as e:
633
- logger.error(
634
- f"Failed to process document {doc_id}: {str(e)}"
635
- )
636
  await self.doc_status.upsert(
637
  {
638
  doc_id: {
 
395
  split_by_character is None, this parameter is ignored.
396
  """
397
  await self.apipeline_process_documents(string_or_strings)
398
+ await self.apipeline_process_enqueue_documents(
399
+ split_by_character, split_by_character_only
400
+ )
401
 
402
  def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
403
  loop = always_get_an_event_loop()
 
513
  async def _get_pending_documents(self) -> list[str]:
514
  """Fetch all pending and failed documents."""
515
  to_process_doc_keys: list[str] = []
516
+
517
  # Fetch failed documents
518
  failed_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED)
519
  if failed_docs:
520
  to_process_doc_keys.extend([doc["id"] for doc in failed_docs])
521
+
522
  # Fetch pending documents
523
  pending_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING)
524
  if pending_docs:
525
  to_process_doc_keys.extend([doc["id"] for doc in pending_docs])
526
+
527
  if not to_process_doc_keys:
528
  logger.info("All documents have been processed or are duplicates")
529
  return []
530
+
531
  return to_process_doc_keys
532
+
533
  async def apipeline_process_enqueue_documents(
534
  self,
535
  split_by_character: str | None = None,
 
537
  ) -> None:
538
  """
539
  Process pending documents by splitting them into chunks, processing
540
+ each chunk for entity and relation extraction, and updating the
541
  document status.
542
+
543
  1. Get all pending and failed documents
544
  2. Split document content into chunks
545
  3. Process each chunk for entity and relation extraction
546
  4. Update the document status
547
+ """
548
  # 1. get all pending and failed documents
549
  pending_doc_ids = await self._get_pending_documents()
550
+
551
  if not pending_doc_ids:
552
  logger.info("All documents have been processed or are duplicates")
553
  return
554
+
555
  # Get allready processed documents (text chunks and full docs)
556
+ text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(
557
+ pending_doc_ids
558
+ )
559
  full_docs_processed_doc_ids = await self.full_docs.filter_keys(pending_doc_ids)
560
 
561
  # 2. split docs into chunks, insert chunks, update doc status
562
  batch_size = self.addon_params.get("insert_batch_size", 10)
563
  batch_docs_list = [
564
+ pending_doc_ids[i : i + batch_size]
565
+ for i in range(0, len(pending_doc_ids), batch_size)
566
  ]
567
+
568
  # 3. iterate over batches
569
  tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
570
  for batch_idx, doc_ids in tqdm_async(
571
  enumerate(batch_docs_list),
572
+ desc="Process Batches",
573
  ):
574
  # 4. iterate over batch
575
  for doc_id in tqdm_async(
 
585
  "updated_at": datetime.now().isoformat(),
586
  "content_summary": status_doc["content_summary"],
587
  "content_length": status_doc["content_length"],
588
+ "created_at": status_doc["created_at"],
589
  }
590
  }
591
  )
 
604
  self.tiktoken_model_name,
605
  )
606
  }
607
+
608
+ # Ensure chunk insertion and graph processing happen sequentially, not in parallel
609
  await self._process_entity_relation_graph(chunks)
610
  await self.chunks_vdb.upsert(chunks)
611
 
612
  # Check if document already processed the doc
613
  if doc_id not in full_docs_processed_doc_ids:
614
  tasks[doc_id].append(
615
+ self.full_docs.upsert(
616
+ {doc_id: {"content": status_doc["content"]}}
617
+ )
618
  )
619
+
620
  # Check if chunks already processed the doc
621
  if doc_id not in text_chunks_processed_doc_ids:
622
  tasks[doc_id].append(self.text_chunks.upsert(chunks))
623
 
624
+ # Process document (text chunks and full docs) in parallel
625
  for doc_id, task in tasks.items():
626
  try:
627
  await asyncio.gather(*task)
 
637
  await self._insert_done()
638
 
639
  except Exception as e:
640
+ logger.error(f"Failed to process document {doc_id}: {str(e)}")
 
 
641
  await self.doc_status.upsert(
642
  {
643
  doc_id: {