yangdx
commited on
Commit
·
dde8474
1
Parent(s):
fe1b11b
Fix linting
Browse files- lightrag/lightrag.py +13 -16
lightrag/lightrag.py
CHANGED
@@ -769,7 +769,6 @@ class LightRAG:
|
|
769 |
async with pipeline_status_lock:
|
770 |
# Ensure only one worker is processing documents
|
771 |
if not pipeline_status.get("busy", False):
|
772 |
-
|
773 |
processing_docs, failed_docs, pending_docs = await asyncio.gather(
|
774 |
self.doc_status.get_docs_by_status(DocStatus.PROCESSING),
|
775 |
self.doc_status.get_docs_by_status(DocStatus.FAILED),
|
@@ -833,12 +832,12 @@ class LightRAG:
|
|
833 |
pipeline_status["history_messages"].append(log_message)
|
834 |
|
835 |
async def process_document(
|
836 |
-
doc_id: str,
|
837 |
status_doc: DocProcessingStatus,
|
838 |
split_by_character: str | None,
|
839 |
split_by_character_only: bool,
|
840 |
pipeline_status: dict,
|
841 |
-
pipeline_status_lock: asyncio.Lock
|
842 |
) -> None:
|
843 |
"""Process single document"""
|
844 |
try:
|
@@ -912,9 +911,7 @@ class LightRAG:
|
|
912 |
)
|
913 |
except Exception as e:
|
914 |
# Log error and update pipeline status
|
915 |
-
error_msg = (
|
916 |
-
f"Failed to process document {doc_id}: {str(e)}"
|
917 |
-
)
|
918 |
logger.error(error_msg)
|
919 |
async with pipeline_status_lock:
|
920 |
pipeline_status["latest_message"] = error_msg
|
@@ -945,38 +942,38 @@ class LightRAG:
|
|
945 |
)
|
946 |
|
947 |
# 3. iterate over batches
|
948 |
-
total_batches = len(docs_batches)
|
949 |
for batch_idx, docs_batch in enumerate(docs_batches):
|
950 |
-
|
951 |
-
|
952 |
-
|
|
|
953 |
logger.info(log_message)
|
954 |
pipeline_status["cur_batch"] = current_batch
|
955 |
pipeline_status["latest_message"] = log_message
|
956 |
pipeline_status["history_messages"].append(log_message)
|
957 |
-
|
958 |
doc_tasks = []
|
959 |
for doc_id, status_doc in docs_batch:
|
960 |
doc_tasks.append(
|
961 |
process_document(
|
962 |
-
doc_id,
|
963 |
status_doc,
|
964 |
split_by_character,
|
965 |
split_by_character_only,
|
966 |
pipeline_status,
|
967 |
-
pipeline_status_lock
|
968 |
)
|
969 |
)
|
970 |
-
|
971 |
# Process documents in one batch parallelly
|
972 |
await asyncio.gather(*doc_tasks)
|
973 |
await self._insert_done()
|
974 |
-
|
975 |
log_message = f"Completed batch {current_batch} of {total_batches}."
|
976 |
logger.info(log_message)
|
977 |
pipeline_status["latest_message"] = log_message
|
978 |
pipeline_status["history_messages"].append(log_message)
|
979 |
-
|
980 |
|
981 |
# Check if there's a pending request to process more documents (with lock)
|
982 |
has_pending_request = False
|
|
|
769 |
async with pipeline_status_lock:
|
770 |
# Ensure only one worker is processing documents
|
771 |
if not pipeline_status.get("busy", False):
|
|
|
772 |
processing_docs, failed_docs, pending_docs = await asyncio.gather(
|
773 |
self.doc_status.get_docs_by_status(DocStatus.PROCESSING),
|
774 |
self.doc_status.get_docs_by_status(DocStatus.FAILED),
|
|
|
832 |
pipeline_status["history_messages"].append(log_message)
|
833 |
|
834 |
async def process_document(
|
835 |
+
doc_id: str,
|
836 |
status_doc: DocProcessingStatus,
|
837 |
split_by_character: str | None,
|
838 |
split_by_character_only: bool,
|
839 |
pipeline_status: dict,
|
840 |
+
pipeline_status_lock: asyncio.Lock,
|
841 |
) -> None:
|
842 |
"""Process single document"""
|
843 |
try:
|
|
|
911 |
)
|
912 |
except Exception as e:
|
913 |
# Log error and update pipeline status
|
914 |
+
error_msg = f"Failed to process document {doc_id}: {str(e)}"
|
|
|
|
|
915 |
logger.error(error_msg)
|
916 |
async with pipeline_status_lock:
|
917 |
pipeline_status["latest_message"] = error_msg
|
|
|
942 |
)
|
943 |
|
944 |
# 3. iterate over batches
|
945 |
+
total_batches = len(docs_batches)
|
946 |
for batch_idx, docs_batch in enumerate(docs_batches):
|
947 |
+
current_batch = batch_idx + 1
|
948 |
+
log_message = (
|
949 |
+
f"Start processing batch {current_batch} of {total_batches}."
|
950 |
+
)
|
951 |
logger.info(log_message)
|
952 |
pipeline_status["cur_batch"] = current_batch
|
953 |
pipeline_status["latest_message"] = log_message
|
954 |
pipeline_status["history_messages"].append(log_message)
|
955 |
+
|
956 |
doc_tasks = []
|
957 |
for doc_id, status_doc in docs_batch:
|
958 |
doc_tasks.append(
|
959 |
process_document(
|
960 |
+
doc_id,
|
961 |
status_doc,
|
962 |
split_by_character,
|
963 |
split_by_character_only,
|
964 |
pipeline_status,
|
965 |
+
pipeline_status_lock,
|
966 |
)
|
967 |
)
|
968 |
+
|
969 |
# Process documents in one batch parallelly
|
970 |
await asyncio.gather(*doc_tasks)
|
971 |
await self._insert_done()
|
972 |
+
|
973 |
log_message = f"Completed batch {current_batch} of {total_batches}."
|
974 |
logger.info(log_message)
|
975 |
pipeline_status["latest_message"] = log_message
|
976 |
pipeline_status["history_messages"].append(log_message)
|
|
|
977 |
|
978 |
# Check if there's a pending request to process more documents (with lock)
|
979 |
has_pending_request = False
|