YanSte commited on
Commit
4be600c
·
1 Parent(s): c4c5b1f

updated the pipe

Browse files
Files changed (1) hide show
  1. lightrag/lightrag.py +74 -50
lightrag/lightrag.py CHANGED
@@ -4,7 +4,7 @@ from tqdm.asyncio import tqdm as tqdm_async
4
  from dataclasses import asdict, dataclass, field
5
  from datetime import datetime
6
  from functools import partial
7
- from typing import Any, Callable, Optional, Type, Union, cast
8
  import traceback
9
  from .operate import (
10
  chunking_by_token_size,
@@ -561,72 +561,96 @@ class LightRAG:
561
  ]
562
  for i, el in enumerate(batch_docs_list):
563
  items = ((k, v) for d in el for k, v in d.items())
 
 
 
 
 
 
 
 
564
  for doc_id, doc in tqdm_async(
565
  items,
566
  desc=f"Level 1 - Spliting doc in batch {i // len(batch_docs_list) + 1}",
567
  ):
568
- doc_status: dict[str, Any] = {
569
- "content_summary": doc["content_summary"],
570
- "content_length": doc["content_length"],
571
- "status": DocStatus.PROCESSING,
572
- "created_at": doc["created_at"],
573
- "updated_at": datetime.now().isoformat(),
574
- }
575
- try:
576
- await self.doc_status.upsert({doc_id: doc_status})
577
-
578
- # Generate chunks from document
579
- chunks: dict[str, Any] = {
580
- compute_mdhash_id(dp["content"], prefix="chunk-"): {
581
- **dp,
582
- "full_doc_id": doc_id,
583
- }
584
- for dp in self.chunking_func(
585
- doc["content"],
586
- split_by_character,
587
- split_by_character_only,
588
- self.chunk_overlap_token_size,
589
- self.chunk_token_size,
590
- self.tiktoken_model_name,
591
- )
592
  }
593
- await self.chunks_vdb.upsert(chunks)
594
-
595
- # Update status with chunks information
596
- await self._process_entity_relation_graph(chunks)
597
-
598
- if doc_id not in full_docs_new_docs_ids:
599
- await self.full_docs.upsert(
600
- {doc_id: {"content": doc["content"]}}
601
- )
602
 
603
- if doc_id not in text_chunks_new_docs_ids:
604
- await self.text_chunks.upsert(chunks)
605
 
606
- doc_status.update(
607
- {
608
- "status": DocStatus.PROCESSED,
609
- "chunks_count": len(chunks),
610
- "updated_at": datetime.now().isoformat(),
611
- }
 
 
 
 
 
 
 
612
  )
613
- await self.doc_status.upsert({doc_id: doc_status})
614
- await self._insert_done()
615
-
 
 
 
 
616
  except Exception as e:
617
- # Update status with failed information
618
  doc_status.update(
619
  {
620
- "status": DocStatus.FAILED,
621
  "error": str(e),
622
  "updated_at": datetime.now().isoformat(),
623
  }
624
  )
625
  await self.doc_status.upsert({doc_id: doc_status})
626
- logger.error(
627
- f"Failed to process document {doc_id}: {str(e)}\n{traceback.format_exc()}"
 
 
628
  )
629
- continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
630
 
631
  async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None:
632
  try:
 
4
  from dataclasses import asdict, dataclass, field
5
  from datetime import datetime
6
  from functools import partial
7
+ from typing import Any, Callable, Coroutine, Optional, Type, Union, cast
8
  import traceback
9
  from .operate import (
10
  chunking_by_token_size,
 
561
  ]
562
  for i, el in enumerate(batch_docs_list):
563
  items = ((k, v) for d in el for k, v in d.items())
564
+
565
+ tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
566
+
567
+ doc_status: dict[str, Any] = {
568
+ "status": DocStatus.PROCESSING,
569
+ "updated_at": datetime.now().isoformat(),
570
+ }
571
+
572
  for doc_id, doc in tqdm_async(
573
  items,
574
  desc=f"Level 1 - Spliting doc in batch {i // len(batch_docs_list) + 1}",
575
  ):
576
+ doc_status.update(
577
+ {
578
+ "content_summary": doc["content_summary"],
579
+ "content_length": doc["content_length"],
580
+ "created_at": doc["created_at"],
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
581
  }
582
+ )
 
 
 
 
 
 
 
 
583
 
584
+ await self.doc_status.upsert({doc_id: doc_status})
 
585
 
586
+ # Generate chunks from document
587
+ chunks: dict[str, Any] = {
588
+ compute_mdhash_id(dp["content"], prefix="chunk-"): {
589
+ **dp,
590
+ "full_doc_id": doc_id,
591
+ }
592
+ for dp in self.chunking_func(
593
+ doc["content"],
594
+ split_by_character,
595
+ split_by_character_only,
596
+ self.chunk_overlap_token_size,
597
+ self.chunk_token_size,
598
+ self.tiktoken_model_name,
599
  )
600
+ }
601
+ try:
602
+ # If fails it's failed on full doc and text chunks upset
603
+ if doc["status"] != DocStatus.FAILED:
604
+ # Ensure chunk insertion and graph processing happen sequentially
605
+ await self._process_entity_relation_graph(chunks)
606
+ await self.chunks_vdb.upsert(chunks)
607
  except Exception as e:
 
608
  doc_status.update(
609
  {
610
+ "status": DocStatus.PENDING,
611
  "error": str(e),
612
  "updated_at": datetime.now().isoformat(),
613
  }
614
  )
615
  await self.doc_status.upsert({doc_id: doc_status})
616
+
617
+ if doc_id not in full_docs_new_docs_ids:
618
+ tasks[doc_id].append(
619
+ self.full_docs.upsert({doc_id: {"content": doc["content"]}})
620
  )
621
+
622
+ if doc_id not in text_chunks_new_docs_ids:
623
+ tasks[doc_id].append(self.text_chunks.upsert(chunks))
624
+
625
+ for doc_id, task in tasks.items():
626
+ try:
627
+ await asyncio.gather(*task)
628
+
629
+ # Update document status
630
+ doc_status.update(
631
+ {
632
+ "status": DocStatus.PROCESSED,
633
+ "chunks_count": len(chunks),
634
+ "updated_at": datetime.now().isoformat(),
635
+ }
636
+ )
637
+ await self.doc_status.upsert({doc_id: doc_status})
638
+ await self._insert_done()
639
+
640
+ except Exception as e:
641
+ # Update status with failed information
642
+ doc_status.update(
643
+ {
644
+ "status": DocStatus.FAILED,
645
+ "error": str(e),
646
+ "updated_at": datetime.now().isoformat(),
647
+ }
648
+ )
649
+ await self.doc_status.upsert({doc_id: doc_status})
650
+ logger.error(
651
+ f"Failed to process document {doc_id}: {str(e)}\n{traceback.format_exc()}"
652
+ )
653
+ continue
654
 
655
  async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None:
656
  try: