YanSte commited on
Commit
a6a0d04
·
1 Parent(s): 40b657d

updated naming

Browse files
Files changed (1) hide show
  1. lightrag/lightrag.py +15 -16
lightrag/lightrag.py CHANGED
@@ -396,7 +396,7 @@ class LightRAG:
396
  split_by_character is None, this parameter is ignored.
397
  """
398
  await self.apipeline_process_documents(string_or_strings)
399
- await self.apipeline_process_chunks(split_by_character, split_by_character_only)
400
 
401
  def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
402
  loop = always_get_an_event_loop()
@@ -465,7 +465,7 @@ class LightRAG:
465
  if update_storage:
466
  await self._insert_done()
467
 
468
- async def apipeline_process_documents(self, string_or_strings: str | list[str]):
469
  """Pipeline process documents
470
 
471
  1. Remove duplicate contents from the list
@@ -505,7 +505,7 @@ class LightRAG:
505
  logger.info("All documents have been processed or are duplicates")
506
  return
507
 
508
- # 4. Store original document
509
  await self.doc_status.upsert(new_docs)
510
  logger.info(f"Stored {len(new_docs)} new unique documents")
511
 
@@ -529,23 +529,21 @@ class LightRAG:
529
 
530
  return to_process_doc_keys
531
 
532
- async def apipeline_process_chunks(
533
  self,
534
  split_by_character: str | None = None,
535
  split_by_character_only: bool = False,
536
  ) -> None:
537
- """Pipeline process chunks
538
-
539
- 1. Get pending documents
540
- 2. Split documents into chunks
541
- 3. Insert chunks
542
-
543
- Args:
544
- split_by_character (str | None): If not None, split the string by character, if chunk longer than
545
- chunk_size, split the sub chunk by token size.
546
- split_by_character_only (bool): If split_by_character_only is True, split the string by character only,
547
- when split_by_character is None, this parameter is ignored.
548
  """
 
 
 
 
 
 
 
 
 
549
  # 1. get all pending and failed documents
550
  pending_doc_ids = await self._get_pending_documents()
551
 
@@ -612,10 +610,11 @@ class LightRAG:
612
  self.full_docs.upsert({doc_id: {"content": status_doc["content"]}})
613
  )
614
 
615
- # check if chunks already processed the doc
616
  if doc_id not in text_chunks_processed_doc_ids:
617
  tasks[doc_id].append(self.text_chunks.upsert(chunks))
618
 
 
619
  for doc_id, task in tasks.items():
620
  try:
621
  await asyncio.gather(*task)
 
396
  split_by_character is None, this parameter is ignored.
397
  """
398
  await self.apipeline_process_documents(string_or_strings)
399
+ await self.apipeline_process_enqueue_documents(split_by_character, split_by_character_only)
400
 
401
  def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
402
  loop = always_get_an_event_loop()
 
465
  if update_storage:
466
  await self._insert_done()
467
 
468
+ async def apipeline_enqueue_documents(self, string_or_strings: str | list[str]):
469
  """Pipeline process documents
470
 
471
  1. Remove duplicate contents from the list
 
505
  logger.info("All documents have been processed or are duplicates")
506
  return
507
 
508
+ # 4. Store status document
509
  await self.doc_status.upsert(new_docs)
510
  logger.info(f"Stored {len(new_docs)} new unique documents")
511
 
 
529
 
530
  return to_process_doc_keys
531
 
532
+ async def apipeline_process_enqueue_documents(
533
  self,
534
  split_by_character: str | None = None,
535
  split_by_character_only: bool = False,
536
  ) -> None:
 
 
 
 
 
 
 
 
 
 
 
537
  """
538
+ Process pending documents by splitting them into chunks, processing
539
+ each chunk for entity and relation extraction, and updating the
540
+ document status.
541
+
542
+ 1. Get all pending and failed documents
543
+ 2. Split document content into chunks
544
+ 3. Process each chunk for entity and relation extraction
545
+ 4. Update the document status
546
+ """
547
  # 1. get all pending and failed documents
548
  pending_doc_ids = await self._get_pending_documents()
549
 
 
610
  self.full_docs.upsert({doc_id: {"content": status_doc["content"]}})
611
  )
612
 
613
+ # Check if chunks already processed the doc
614
  if doc_id not in text_chunks_processed_doc_ids:
615
  tasks[doc_id].append(self.text_chunks.upsert(chunks))
616
 
617
+ # Process document (text chunks and full docs) in parallel
618
  for doc_id, task in tasks.items():
619
  try:
620
  await asyncio.gather(*task)