jin commited on
Commit
121ced9
·
1 Parent(s): 18a40e5

support pipeline mode

Browse files
.gitignore CHANGED
@@ -21,4 +21,4 @@ rag_storage
21
  venv/
22
  examples/input/
23
  examples/output/
24
- .DS_Store
 
21
  venv/
22
  examples/input/
23
  examples/output/
24
+ .DS_Store
examples/lightrag_oracle_demo.py CHANGED
@@ -89,49 +89,45 @@ async def main():
89
  rag = LightRAG(
90
  # log_level="DEBUG",
91
  working_dir=WORKING_DIR,
92
- entity_extract_max_gleaning = 1,
93
-
94
  enable_llm_cache=True,
95
- enable_llm_cache_for_entity_extract = True,
96
- embedding_cache_config= None, # {"enabled": True,"similarity_threshold": 0.90},
97
-
98
-
99
  chunk_token_size=CHUNK_TOKEN_SIZE,
100
- llm_model_max_token_size = MAX_TOKENS,
101
  llm_model_func=llm_model_func,
102
  embedding_func=EmbeddingFunc(
103
  embedding_dim=embedding_dimension,
104
  max_token_size=500,
105
  func=embedding_func,
106
- ),
107
-
108
- graph_storage = "OracleGraphStorage",
109
- kv_storage = "OracleKVStorage",
110
  vector_storage="OracleVectorDBStorage",
111
-
112
- addon_params = {"example_number":1,
113
- "language":"Simplfied Chinese",
114
- "entity_types": ["organization", "person", "geo", "event"],
115
- "insert_batch_size":2,
116
- }
117
  )
118
 
119
- # Setthe KV/vector/graph storage's `db` property, so all operation will use same connection pool
120
- rag.set_storage_client(db_client = oracle_db)
121
 
122
  # Extract and Insert into LightRAG storage
123
- with open(WORKING_DIR+"/docs.txt", "r", encoding="utf-8") as f:
124
  all_text = f.read()
125
  texts = [x for x in all_text.split("\n") if x]
126
-
127
  # New mode use pipeline
128
  await rag.apipeline_process_documents(texts)
129
- await rag.apipeline_process_chunks()
130
  await rag.apipeline_process_extract_graph()
131
 
132
  # Old method use ainsert
133
- #await rag.ainsert(texts)
134
-
135
  # Perform search in different modes
136
  modes = ["naive", "local", "global", "hybrid"]
137
  for mode in modes:
 
89
  rag = LightRAG(
90
  # log_level="DEBUG",
91
  working_dir=WORKING_DIR,
92
+ entity_extract_max_gleaning=1,
 
93
  enable_llm_cache=True,
94
+ enable_llm_cache_for_entity_extract=True,
95
+ embedding_cache_config=None, # {"enabled": True,"similarity_threshold": 0.90},
 
 
96
  chunk_token_size=CHUNK_TOKEN_SIZE,
97
+ llm_model_max_token_size=MAX_TOKENS,
98
  llm_model_func=llm_model_func,
99
  embedding_func=EmbeddingFunc(
100
  embedding_dim=embedding_dimension,
101
  max_token_size=500,
102
  func=embedding_func,
103
+ ),
104
+ graph_storage="OracleGraphStorage",
105
+ kv_storage="OracleKVStorage",
 
106
  vector_storage="OracleVectorDBStorage",
107
+ addon_params={
108
+ "example_number": 1,
109
+ "language": "Simplfied Chinese",
110
+ "entity_types": ["organization", "person", "geo", "event"],
111
+ "insert_batch_size": 2,
112
+ },
113
  )
114
 
115
+ # Setthe KV/vector/graph storage's `db` property, so all operation will use same connection pool
116
+ rag.set_storage_client(db_client=oracle_db)
117
 
118
  # Extract and Insert into LightRAG storage
119
+ with open(WORKING_DIR + "/docs.txt", "r", encoding="utf-8") as f:
120
  all_text = f.read()
121
  texts = [x for x in all_text.split("\n") if x]
122
+
123
  # New mode use pipeline
124
  await rag.apipeline_process_documents(texts)
125
+ await rag.apipeline_process_chunks()
126
  await rag.apipeline_process_extract_graph()
127
 
128
  # Old method use ainsert
129
+ # await rag.ainsert(texts)
130
+
131
  # Perform search in different modes
132
  modes = ["naive", "local", "global", "hybrid"]
133
  for mode in modes:
lightrag/kg/oracle_impl.py CHANGED
@@ -3,7 +3,7 @@ import asyncio
3
  # import html
4
  # import os
5
  from dataclasses import dataclass
6
- from typing import Union, List, Dict, Set, Any, Tuple
7
  import numpy as np
8
  import array
9
 
@@ -170,7 +170,7 @@ class OracleKVStorage(BaseKVStorage):
170
 
171
  def __post_init__(self):
172
  self._data = {}
173
- self._max_batch_size = self.global_config.get("embedding_batch_num",10)
174
 
175
  ################ QUERY METHODS ################
176
 
@@ -190,7 +190,7 @@ class OracleKVStorage(BaseKVStorage):
190
  return res
191
  else:
192
  return None
193
-
194
  async def get_by_mode_and_id(self, mode: str, id: str) -> Union[dict, None]:
195
  """Specifically for llm_response_cache."""
196
  SQL = SQL_TEMPLATES["get_by_mode_id_" + self.namespace]
@@ -199,11 +199,11 @@ class OracleKVStorage(BaseKVStorage):
199
  array_res = await self.db.query(SQL, params, multirows=True)
200
  res = {}
201
  for row in array_res:
202
- res[row["id"]] = row
203
  return res
204
  else:
205
  return None
206
-
207
  async def get_by_ids(self, ids: list[str], fields=None) -> Union[list[dict], None]:
208
  """get doc_chunks data based on id"""
209
  SQL = SQL_TEMPLATES["get_by_ids_" + self.namespace].format(
@@ -222,7 +222,7 @@ class OracleKVStorage(BaseKVStorage):
222
  dict_res[mode] = {}
223
  for row in res:
224
  dict_res[row["mode"]][row["id"]] = row
225
- res = [{k: v} for k, v in dict_res.items()]
226
  if res:
227
  data = res # [{"data":i} for i in res]
228
  # print(data)
@@ -230,7 +230,9 @@ class OracleKVStorage(BaseKVStorage):
230
  else:
231
  return None
232
 
233
- async def get_by_status_and_ids(self, status: str, ids: list[str]) -> Union[list[dict], None]:
 
 
234
  """Specifically for llm_response_cache."""
235
  if ids is not None:
236
  SQL = SQL_TEMPLATES["get_by_status_ids_" + self.namespace].format(
@@ -244,7 +246,7 @@ class OracleKVStorage(BaseKVStorage):
244
  return res
245
  else:
246
  return None
247
-
248
  async def filter_keys(self, keys: list[str]) -> set[str]:
249
  """Return keys that don't exist in storage"""
250
  SQL = SQL_TEMPLATES["filter_keys"].format(
@@ -258,7 +260,6 @@ class OracleKVStorage(BaseKVStorage):
258
  return data
259
  else:
260
  return set(keys)
261
-
262
 
263
  ################ INSERT METHODS ################
264
  async def upsert(self, data: dict[str, dict]):
@@ -281,7 +282,7 @@ class OracleKVStorage(BaseKVStorage):
281
  embeddings = np.concatenate(embeddings_list)
282
  for i, d in enumerate(list_data):
283
  d["__vector__"] = embeddings[i]
284
-
285
  merge_sql = SQL_TEMPLATES["merge_chunk"]
286
  for item in list_data:
287
  _data = {
@@ -320,11 +321,9 @@ class OracleKVStorage(BaseKVStorage):
320
 
321
  await self.db.execute(upsert_sql, _data)
322
  return None
323
-
324
  async def change_status(self, id: str, status: str):
325
- SQL = SQL_TEMPLATES["change_status"].format(
326
- table_name=N_T[self.namespace]
327
- )
328
  params = {"workspace": self.db.workspace, "id": id, "status": status}
329
  await self.db.execute(SQL, params)
330
 
@@ -673,8 +672,8 @@ TABLES = {
673
  },
674
  "LIGHTRAG_LLM_CACHE": {
675
  "ddl": """CREATE TABLE LIGHTRAG_LLM_CACHE (
676
- id varchar(256) PRIMARY KEY,
677
- workspace varchar(1024),
678
  cache_mode varchar(256),
679
  model_name varchar(256),
680
  original_prompt clob,
@@ -708,47 +707,32 @@ TABLES = {
708
  SQL_TEMPLATES = {
709
  # SQL for KVStorage
710
  "get_by_id_full_docs": "select ID,content,status from LIGHTRAG_DOC_FULL where workspace=:workspace and ID=:id",
711
-
712
  "get_by_id_text_chunks": "select ID,TOKENS,content,CHUNK_ORDER_INDEX,FULL_DOC_ID,status from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and ID=:id",
713
-
714
  "get_by_id_llm_response_cache": """SELECT id, original_prompt, NVL(return_value, '') as "return", cache_mode as "mode"
715
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=:workspace AND id=:id""",
716
-
717
  "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, NVL(return_value, '') as "return", cache_mode as "mode"
718
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=:workspace AND cache_mode=:cache_mode AND id=:id""",
719
-
720
  "get_by_ids_llm_response_cache": """SELECT id, original_prompt, NVL(return_value, '') as "return", cache_mode as "mode"
721
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=:workspace AND id IN ({ids})""",
722
-
723
  "get_by_ids_full_docs": "select t.*,createtime as created_at from LIGHTRAG_DOC_FULL t where workspace=:workspace and ID in ({ids})",
724
-
725
  "get_by_ids_text_chunks": "select ID,TOKENS,content,CHUNK_ORDER_INDEX,FULL_DOC_ID from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and ID in ({ids})",
726
-
727
  "get_by_status_ids_full_docs": "select id,status from LIGHTRAG_DOC_FULL t where workspace=:workspace AND status=:status and ID in ({ids})",
728
-
729
  "get_by_status_ids_text_chunks": "select id,status from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and status=:status ID in ({ids})",
730
-
731
  "get_by_status_full_docs": "select id,status from LIGHTRAG_DOC_FULL t where workspace=:workspace AND status=:status",
732
-
733
  "get_by_status_text_chunks": "select id,status from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and status=:status",
734
-
735
  "filter_keys": "select id from {table_name} where workspace=:workspace and id in ({ids})",
736
-
737
  "change_status": "update {table_name} set status=:status,updatetime=SYSDATE where workspace=:workspace and id=:id",
738
-
739
  "merge_doc_full": """MERGE INTO LIGHTRAG_DOC_FULL a
740
  USING DUAL
741
  ON (a.id = :id and a.workspace = :workspace)
742
  WHEN NOT MATCHED THEN
743
  INSERT(id,content,workspace) values(:id,:content,:workspace)""",
744
-
745
  "merge_chunk": """MERGE INTO LIGHTRAG_DOC_CHUNKS
746
  USING DUAL
747
  ON (id = :id and workspace = :workspace)
748
  WHEN NOT MATCHED THEN INSERT
749
  (id,content,workspace,tokens,chunk_order_index,full_doc_id,content_vector,status)
750
  values (:id,:content,:workspace,:tokens,:chunk_order_index,:full_doc_id,:content_vector,:status) """,
751
-
752
  "upsert_llm_response_cache": """MERGE INTO LIGHTRAG_LLM_CACHE a
753
  USING DUAL
754
  ON (a.id = :id)
@@ -760,8 +744,6 @@ SQL_TEMPLATES = {
760
  return_value = :return_value,
761
  cache_mode = :cache_mode,
762
  updatetime = SYSDATE""",
763
-
764
-
765
  # SQL for VectorStorage
766
  "entities": """SELECT name as entity_name FROM
767
  (SELECT id,name,VECTOR_DISTANCE(content_vector,vector(:embedding_string,{dimension},{dtype}),COSINE) as distance
@@ -818,7 +800,7 @@ SQL_TEMPLATES = {
818
  INSERT(workspace,name,entity_type,description,source_chunk_id,content,content_vector)
819
  values (:workspace,:name,:entity_type,:description,:source_chunk_id,:content,:content_vector)
820
  WHEN MATCHED THEN
821
- UPDATE SET
822
  entity_type=:entity_type,description=:description,source_chunk_id=:source_chunk_id,content=:content,content_vector=:content_vector,updatetime=SYSDATE""",
823
  "merge_edge": """MERGE INTO LIGHTRAG_GRAPH_EDGES a
824
  USING DUAL
@@ -827,7 +809,7 @@ SQL_TEMPLATES = {
827
  INSERT(workspace,source_name,target_name,weight,keywords,description,source_chunk_id,content,content_vector)
828
  values (:workspace,:source_name,:target_name,:weight,:keywords,:description,:source_chunk_id,:content,:content_vector)
829
  WHEN MATCHED THEN
830
- UPDATE SET
831
  weight=:weight,keywords=:keywords,description=:description,source_chunk_id=:source_chunk_id,content=:content,content_vector=:content_vector,updatetime=SYSDATE""",
832
  "get_all_nodes": """WITH t0 AS (
833
  SELECT name AS id, entity_type AS label, entity_type, description,
 
3
  # import html
4
  # import os
5
  from dataclasses import dataclass
6
+ from typing import Union
7
  import numpy as np
8
  import array
9
 
 
170
 
171
  def __post_init__(self):
172
  self._data = {}
173
+ self._max_batch_size = self.global_config.get("embedding_batch_num", 10)
174
 
175
  ################ QUERY METHODS ################
176
 
 
190
  return res
191
  else:
192
  return None
193
+
194
  async def get_by_mode_and_id(self, mode: str, id: str) -> Union[dict, None]:
195
  """Specifically for llm_response_cache."""
196
  SQL = SQL_TEMPLATES["get_by_mode_id_" + self.namespace]
 
199
  array_res = await self.db.query(SQL, params, multirows=True)
200
  res = {}
201
  for row in array_res:
202
+ res[row["id"]] = row
203
  return res
204
  else:
205
  return None
206
+
207
  async def get_by_ids(self, ids: list[str], fields=None) -> Union[list[dict], None]:
208
  """get doc_chunks data based on id"""
209
  SQL = SQL_TEMPLATES["get_by_ids_" + self.namespace].format(
 
222
  dict_res[mode] = {}
223
  for row in res:
224
  dict_res[row["mode"]][row["id"]] = row
225
+ res = [{k: v} for k, v in dict_res.items()]
226
  if res:
227
  data = res # [{"data":i} for i in res]
228
  # print(data)
 
230
  else:
231
  return None
232
 
233
+ async def get_by_status_and_ids(
234
+ self, status: str, ids: list[str]
235
+ ) -> Union[list[dict], None]:
236
  """Specifically for llm_response_cache."""
237
  if ids is not None:
238
  SQL = SQL_TEMPLATES["get_by_status_ids_" + self.namespace].format(
 
246
  return res
247
  else:
248
  return None
249
+
250
  async def filter_keys(self, keys: list[str]) -> set[str]:
251
  """Return keys that don't exist in storage"""
252
  SQL = SQL_TEMPLATES["filter_keys"].format(
 
260
  return data
261
  else:
262
  return set(keys)
 
263
 
264
  ################ INSERT METHODS ################
265
  async def upsert(self, data: dict[str, dict]):
 
282
  embeddings = np.concatenate(embeddings_list)
283
  for i, d in enumerate(list_data):
284
  d["__vector__"] = embeddings[i]
285
+
286
  merge_sql = SQL_TEMPLATES["merge_chunk"]
287
  for item in list_data:
288
  _data = {
 
321
 
322
  await self.db.execute(upsert_sql, _data)
323
  return None
324
+
325
  async def change_status(self, id: str, status: str):
326
+ SQL = SQL_TEMPLATES["change_status"].format(table_name=N_T[self.namespace])
 
 
327
  params = {"workspace": self.db.workspace, "id": id, "status": status}
328
  await self.db.execute(SQL, params)
329
 
 
672
  },
673
  "LIGHTRAG_LLM_CACHE": {
674
  "ddl": """CREATE TABLE LIGHTRAG_LLM_CACHE (
675
+ id varchar(256) PRIMARY KEY,
676
+ workspace varchar(1024),
677
  cache_mode varchar(256),
678
  model_name varchar(256),
679
  original_prompt clob,
 
707
  SQL_TEMPLATES = {
708
  # SQL for KVStorage
709
  "get_by_id_full_docs": "select ID,content,status from LIGHTRAG_DOC_FULL where workspace=:workspace and ID=:id",
 
710
  "get_by_id_text_chunks": "select ID,TOKENS,content,CHUNK_ORDER_INDEX,FULL_DOC_ID,status from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and ID=:id",
 
711
  "get_by_id_llm_response_cache": """SELECT id, original_prompt, NVL(return_value, '') as "return", cache_mode as "mode"
712
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=:workspace AND id=:id""",
 
713
  "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, NVL(return_value, '') as "return", cache_mode as "mode"
714
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=:workspace AND cache_mode=:cache_mode AND id=:id""",
 
715
  "get_by_ids_llm_response_cache": """SELECT id, original_prompt, NVL(return_value, '') as "return", cache_mode as "mode"
716
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=:workspace AND id IN ({ids})""",
 
717
  "get_by_ids_full_docs": "select t.*,createtime as created_at from LIGHTRAG_DOC_FULL t where workspace=:workspace and ID in ({ids})",
 
718
  "get_by_ids_text_chunks": "select ID,TOKENS,content,CHUNK_ORDER_INDEX,FULL_DOC_ID from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and ID in ({ids})",
 
719
  "get_by_status_ids_full_docs": "select id,status from LIGHTRAG_DOC_FULL t where workspace=:workspace AND status=:status and ID in ({ids})",
 
720
  "get_by_status_ids_text_chunks": "select id,status from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and status=:status ID in ({ids})",
 
721
  "get_by_status_full_docs": "select id,status from LIGHTRAG_DOC_FULL t where workspace=:workspace AND status=:status",
 
722
  "get_by_status_text_chunks": "select id,status from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and status=:status",
 
723
  "filter_keys": "select id from {table_name} where workspace=:workspace and id in ({ids})",
 
724
  "change_status": "update {table_name} set status=:status,updatetime=SYSDATE where workspace=:workspace and id=:id",
 
725
  "merge_doc_full": """MERGE INTO LIGHTRAG_DOC_FULL a
726
  USING DUAL
727
  ON (a.id = :id and a.workspace = :workspace)
728
  WHEN NOT MATCHED THEN
729
  INSERT(id,content,workspace) values(:id,:content,:workspace)""",
 
730
  "merge_chunk": """MERGE INTO LIGHTRAG_DOC_CHUNKS
731
  USING DUAL
732
  ON (id = :id and workspace = :workspace)
733
  WHEN NOT MATCHED THEN INSERT
734
  (id,content,workspace,tokens,chunk_order_index,full_doc_id,content_vector,status)
735
  values (:id,:content,:workspace,:tokens,:chunk_order_index,:full_doc_id,:content_vector,:status) """,
 
736
  "upsert_llm_response_cache": """MERGE INTO LIGHTRAG_LLM_CACHE a
737
  USING DUAL
738
  ON (a.id = :id)
 
744
  return_value = :return_value,
745
  cache_mode = :cache_mode,
746
  updatetime = SYSDATE""",
 
 
747
  # SQL for VectorStorage
748
  "entities": """SELECT name as entity_name FROM
749
  (SELECT id,name,VECTOR_DISTANCE(content_vector,vector(:embedding_string,{dimension},{dtype}),COSINE) as distance
 
800
  INSERT(workspace,name,entity_type,description,source_chunk_id,content,content_vector)
801
  values (:workspace,:name,:entity_type,:description,:source_chunk_id,:content,:content_vector)
802
  WHEN MATCHED THEN
803
+ UPDATE SET
804
  entity_type=:entity_type,description=:description,source_chunk_id=:source_chunk_id,content=:content,content_vector=:content_vector,updatetime=SYSDATE""",
805
  "merge_edge": """MERGE INTO LIGHTRAG_GRAPH_EDGES a
806
  USING DUAL
 
809
  INSERT(workspace,source_name,target_name,weight,keywords,description,source_chunk_id,content,content_vector)
810
  values (:workspace,:source_name,:target_name,:weight,:keywords,:description,:source_chunk_id,:content,:content_vector)
811
  WHEN MATCHED THEN
812
+ UPDATE SET
813
  weight=:weight,keywords=:keywords,description=:description,source_chunk_id=:source_chunk_id,content=:content,content_vector=:content_vector,updatetime=SYSDATE""",
814
  "get_all_nodes": """WITH t0 AS (
815
  SELECT name AS id, entity_type AS label, entity_type, description,
lightrag/lightrag.py CHANGED
@@ -26,7 +26,7 @@ from .utils import (
26
  convert_response_to_json,
27
  logger,
28
  set_logger,
29
- statistic_data
30
  )
31
  from .base import (
32
  BaseGraphStorage,
@@ -39,30 +39,30 @@ from .base import (
39
 
40
  from .prompt import GRAPH_FIELD_SEP
41
 
42
- STORAGES = {
43
- "JsonKVStorage": '.storage',
44
- "NanoVectorDBStorage": '.storage',
45
- "NetworkXStorage": '.storage',
46
- "JsonDocStatusStorage": '.storage',
47
-
48
- "Neo4JStorage":".kg.neo4j_impl",
49
- "OracleKVStorage":".kg.oracle_impl",
50
- "OracleGraphStorage":".kg.oracle_impl",
51
- "OracleVectorDBStorage":".kg.oracle_impl",
52
- "MilvusVectorDBStorge":".kg.milvus_impl",
53
- "MongoKVStorage":".kg.mongo_impl",
54
- "ChromaVectorDBStorage":".kg.chroma_impl",
55
- "TiDBKVStorage":".kg.tidb_impl",
56
- "TiDBVectorDBStorage":".kg.tidb_impl",
57
- "TiDBGraphStorage":".kg.tidb_impl",
58
- "PGKVStorage":".kg.postgres_impl",
59
- "PGVectorStorage":".kg.postgres_impl",
60
- "AGEStorage":".kg.age_impl",
61
- "PGGraphStorage":".kg.postgres_impl",
62
- "GremlinStorage":".kg.gremlin_impl",
63
- "PGDocStatusStorage":".kg.postgres_impl",
64
  }
65
 
 
66
  def lazy_external_import(module_name: str, class_name: str):
67
  """Lazily import a class from an external module based on the package of the caller."""
68
 
@@ -75,6 +75,7 @@ def lazy_external_import(module_name: str, class_name: str):
75
 
76
  def import_class(*args, **kwargs):
77
  import importlib
 
78
  module = importlib.import_module(module_name, package=package)
79
  cls = getattr(module, class_name)
80
  return cls(*args, **kwargs)
@@ -190,7 +191,7 @@ class LightRAG:
190
  os.makedirs(self.working_dir)
191
 
192
  # show config
193
- global_config=asdict(self)
194
  _print_config = ",\n ".join([f"{k} = {v}" for k, v in global_config.items()])
195
  logger.debug(f"LightRAG init with param:\n {_print_config}\n")
196
 
@@ -198,31 +199,33 @@ class LightRAG:
198
  self.embedding_func = limit_async_func_call(self.embedding_func_max_async)(
199
  self.embedding_func
200
  )
201
-
202
 
203
  # Initialize all storages
204
- self.key_string_value_json_storage_cls: Type[BaseKVStorage] = self._get_storage_class(self.kv_storage)
205
- self.vector_db_storage_cls: Type[BaseVectorStorage] = self._get_storage_class(self.vector_storage)
206
- self.graph_storage_cls: Type[BaseGraphStorage] = self._get_storage_class(self.graph_storage)
207
-
 
 
 
 
 
 
208
  self.key_string_value_json_storage_cls = partial(
209
- self.key_string_value_json_storage_cls,
210
- global_config=global_config
211
  )
212
 
213
  self.vector_db_storage_cls = partial(
214
- self.vector_db_storage_cls,
215
- global_config=global_config
216
  )
217
 
218
  self.graph_storage_cls = partial(
219
- self.graph_storage_cls,
220
- global_config=global_config
221
  )
222
 
223
  self.json_doc_status_storage = self.key_string_value_json_storage_cls(
224
  namespace="json_doc_status_storage",
225
- embedding_func=None,
226
  )
227
 
228
  self.llm_response_cache = self.key_string_value_json_storage_cls(
@@ -264,13 +267,15 @@ class LightRAG:
264
  embedding_func=self.embedding_func,
265
  )
266
 
267
- if self.llm_response_cache and hasattr(self.llm_response_cache, "global_config"):
 
 
268
  hashing_kv = self.llm_response_cache
269
  else:
270
  hashing_kv = self.key_string_value_json_storage_cls(
271
- namespace="llm_response_cache",
272
- embedding_func=None,
273
- )
274
 
275
  self.llm_model_func = limit_async_func_call(self.llm_model_max_async)(
276
  partial(
@@ -292,21 +297,24 @@ class LightRAG:
292
  import_path = STORAGES[storage_name]
293
  storage_class = lazy_external_import(import_path, storage_name)
294
  return storage_class
295
-
296
- def set_storage_client(self,db_client):
297
  # Now only tested on Oracle Database
298
- for storage in [self.vector_db_storage_cls,
299
- self.graph_storage_cls,
300
- self.doc_status, self.full_docs,
301
- self.text_chunks,
302
- self.llm_response_cache,
303
- self.key_string_value_json_storage_cls,
304
- self.chunks_vdb,
305
- self.relationships_vdb,
306
- self.entities_vdb,
307
- self.graph_storage_cls,
308
- self.chunk_entity_relation_graph,
309
- self.llm_response_cache]:
 
 
 
310
  # set client
311
  storage.db = db_client
312
 
@@ -348,11 +356,6 @@ class LightRAG:
348
  }
349
  for content in unique_contents
350
  }
351
-
352
- # 3. Store original document and chunks
353
- await self.full_docs.upsert(
354
- {doc_id: {"content": doc["content"]}}
355
- )
356
 
357
  # 3. Filter out already processed documents
358
  _add_doc_keys = await self.doc_status.filter_keys(list(new_docs.keys()))
@@ -401,7 +404,12 @@ class LightRAG:
401
  }
402
 
403
  # Update status with chunks information
404
- doc_status.update({"chunks_count": len(chunks),"updated_at": datetime.now().isoformat()})
 
 
 
 
 
405
  await self.doc_status.upsert({doc_id: doc_status})
406
 
407
  try:
@@ -425,16 +433,30 @@ class LightRAG:
425
 
426
  self.chunk_entity_relation_graph = maybe_new_kg
427
 
428
-
 
 
 
429
  await self.text_chunks.upsert(chunks)
430
 
431
  # Update status to processed
432
- doc_status.update({"status": DocStatus.PROCESSED,"updated_at": datetime.now().isoformat()})
 
 
 
 
 
433
  await self.doc_status.upsert({doc_id: doc_status})
434
 
435
  except Exception as e:
436
  # Mark as failed if any step fails
437
- doc_status.update({"status": DocStatus.FAILED,"error": str(e),"updated_at": datetime.now().isoformat()})
 
 
 
 
 
 
438
  await self.doc_status.upsert({doc_id: doc_status})
439
  raise e
440
 
@@ -527,7 +549,9 @@ class LightRAG:
527
  # 1. Remove duplicate contents from the list
528
  unique_contents = list(set(doc.strip() for doc in string_or_strings))
529
 
530
- logger.info(f"Received {len(string_or_strings)} docs, contains {len(unique_contents)} new unique documents")
 
 
531
 
532
  # 2. Generate document IDs and initial status
533
  new_docs = {
@@ -542,28 +566,34 @@ class LightRAG:
542
  for content in unique_contents
543
  }
544
 
545
- # 3. Filter out already processed documents
546
  _not_stored_doc_keys = await self.full_docs.filter_keys(list(new_docs.keys()))
547
  if len(_not_stored_doc_keys) < len(new_docs):
548
- logger.info(f"Skipping {len(new_docs)-len(_not_stored_doc_keys)} already existing documents")
 
 
549
  new_docs = {k: v for k, v in new_docs.items() if k in _not_stored_doc_keys}
550
 
551
  if not new_docs:
552
- logger.info(f"All documents have been processed or are duplicates")
553
  return None
554
 
555
- # 4. Store original document
556
  for doc_id, doc in new_docs.items():
557
  await self.full_docs.upsert({doc_id: {"content": doc["content"]}})
558
  await self.full_docs.change_status(doc_id, DocStatus.PENDING)
559
  logger.info(f"Stored {len(new_docs)} new unique documents")
560
-
561
  async def apipeline_process_chunks(self):
562
- """Get pendding documents, split into chunks,insert chunks"""
563
- # 1. get all pending and failed documents
564
  _todo_doc_keys = []
565
- _failed_doc = await self.full_docs.get_by_status_and_ids(status = DocStatus.FAILED,ids = None)
566
- _pendding_doc = await self.full_docs.get_by_status_and_ids(status = DocStatus.PENDING,ids = None)
 
 
 
 
567
  if _failed_doc:
568
  _todo_doc_keys.extend([doc["id"] for doc in _failed_doc])
569
  if _pendding_doc:
@@ -573,10 +603,9 @@ class LightRAG:
573
  return None
574
  else:
575
  logger.info(f"Filtered out {len(_todo_doc_keys)} not processed documents")
576
-
577
  new_docs = {
578
- doc["id"]: doc
579
- for doc in await self.full_docs.get_by_ids(_todo_doc_keys)
580
  }
581
 
582
  # 2. split docs into chunks, insert chunks, update doc status
@@ -585,8 +614,9 @@ class LightRAG:
585
  for i in range(0, len(new_docs), batch_size):
586
  batch_docs = dict(list(new_docs.items())[i : i + batch_size])
587
  for doc_id, doc in tqdm_async(
588
- batch_docs.items(), desc=f"Level 1 - Spliting doc in batch {i//batch_size + 1}"
589
- ):
 
590
  try:
591
  # Generate chunks from document
592
  chunks = {
@@ -616,18 +646,23 @@ class LightRAG:
616
  await self.full_docs.change_status(doc_id, DocStatus.FAILED)
617
  raise e
618
  except Exception as e:
619
- import traceback
620
- error_msg = f"Failed to process document {doc_id}: {str(e)}\n{traceback.format_exc()}"
621
- logger.error(error_msg)
622
- continue
623
- logger.info(f"Stored {chunk_cnt} chunks from {len(new_docs)} documents")
 
624
 
625
  async def apipeline_process_extract_graph(self):
626
  """Get pendding or failed chunks, extract entities and relationships from each chunk"""
627
- # 1. get all pending and failed chunks
628
  _todo_chunk_keys = []
629
- _failed_chunks = await self.text_chunks.get_by_status_and_ids(status = DocStatus.FAILED,ids = None)
630
- _pendding_chunks = await self.text_chunks.get_by_status_and_ids(status = DocStatus.PENDING,ids = None)
 
 
 
 
631
  if _failed_chunks:
632
  _todo_chunk_keys.extend([doc["id"] for doc in _failed_chunks])
633
  if _pendding_chunks:
@@ -635,15 +670,19 @@ class LightRAG:
635
  if not _todo_chunk_keys:
636
  logger.info("All chunks have been processed or are duplicates")
637
  return None
638
-
639
  # Process documents in batches
640
  batch_size = self.addon_params.get("insert_batch_size", 10)
641
 
642
- semaphore = asyncio.Semaphore(batch_size) # Control the number of tasks that are processed simultaneously
 
 
643
 
644
- async def process_chunk(chunk_id):
645
  async with semaphore:
646
- chunks = {i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id])}
 
 
647
  # Extract and store entities and relationships
648
  try:
649
  maybe_new_kg = await extract_entities(
@@ -662,25 +701,29 @@ class LightRAG:
662
  logger.error("Failed to extract entities and relationships")
663
  # Mark as failed if any step fails
664
  await self.text_chunks.change_status(chunk_id, DocStatus.FAILED)
665
- raise e
666
-
667
- with tqdm_async(total=len(_todo_chunk_keys),
668
- desc="\nLevel 1 - Processing chunks",
669
- unit="chunk",
670
- position=0) as progress:
 
 
671
  tasks = []
672
  for chunk_id in _todo_chunk_keys:
673
  task = asyncio.create_task(process_chunk(chunk_id))
674
  tasks.append(task)
675
-
676
  for future in asyncio.as_completed(tasks):
677
  await future
678
  progress.update(1)
679
- progress.set_postfix({
680
- 'LLM call': statistic_data["llm_call"],
681
- 'LLM cache': statistic_data["llm_cache"],
682
- })
683
-
 
 
684
  # Ensure all indexes are updated after each document
685
  await self._insert_done()
686
 
 
26
  convert_response_to_json,
27
  logger,
28
  set_logger,
29
+ statistic_data,
30
  )
31
  from .base import (
32
  BaseGraphStorage,
 
39
 
40
  from .prompt import GRAPH_FIELD_SEP
41
 
42
+ STORAGES = {
43
+ "JsonKVStorage": ".storage",
44
+ "NanoVectorDBStorage": ".storage",
45
+ "NetworkXStorage": ".storage",
46
+ "JsonDocStatusStorage": ".storage",
47
+ "Neo4JStorage": ".kg.neo4j_impl",
48
+ "OracleKVStorage": ".kg.oracle_impl",
49
+ "OracleGraphStorage": ".kg.oracle_impl",
50
+ "OracleVectorDBStorage": ".kg.oracle_impl",
51
+ "MilvusVectorDBStorge": ".kg.milvus_impl",
52
+ "MongoKVStorage": ".kg.mongo_impl",
53
+ "ChromaVectorDBStorage": ".kg.chroma_impl",
54
+ "TiDBKVStorage": ".kg.tidb_impl",
55
+ "TiDBVectorDBStorage": ".kg.tidb_impl",
56
+ "TiDBGraphStorage": ".kg.tidb_impl",
57
+ "PGKVStorage": ".kg.postgres_impl",
58
+ "PGVectorStorage": ".kg.postgres_impl",
59
+ "AGEStorage": ".kg.age_impl",
60
+ "PGGraphStorage": ".kg.postgres_impl",
61
+ "GremlinStorage": ".kg.gremlin_impl",
62
+ "PGDocStatusStorage": ".kg.postgres_impl",
 
63
  }
64
 
65
+
66
  def lazy_external_import(module_name: str, class_name: str):
67
  """Lazily import a class from an external module based on the package of the caller."""
68
 
 
75
 
76
  def import_class(*args, **kwargs):
77
  import importlib
78
+
79
  module = importlib.import_module(module_name, package=package)
80
  cls = getattr(module, class_name)
81
  return cls(*args, **kwargs)
 
191
  os.makedirs(self.working_dir)
192
 
193
  # show config
194
+ global_config = asdict(self)
195
  _print_config = ",\n ".join([f"{k} = {v}" for k, v in global_config.items()])
196
  logger.debug(f"LightRAG init with param:\n {_print_config}\n")
197
 
 
199
  self.embedding_func = limit_async_func_call(self.embedding_func_max_async)(
200
  self.embedding_func
201
  )
 
202
 
203
  # Initialize all storages
204
+ self.key_string_value_json_storage_cls: Type[BaseKVStorage] = (
205
+ self._get_storage_class(self.kv_storage)
206
+ )
207
+ self.vector_db_storage_cls: Type[BaseVectorStorage] = self._get_storage_class(
208
+ self.vector_storage
209
+ )
210
+ self.graph_storage_cls: Type[BaseGraphStorage] = self._get_storage_class(
211
+ self.graph_storage
212
+ )
213
+
214
  self.key_string_value_json_storage_cls = partial(
215
+ self.key_string_value_json_storage_cls, global_config=global_config
 
216
  )
217
 
218
  self.vector_db_storage_cls = partial(
219
+ self.vector_db_storage_cls, global_config=global_config
 
220
  )
221
 
222
  self.graph_storage_cls = partial(
223
+ self.graph_storage_cls, global_config=global_config
 
224
  )
225
 
226
  self.json_doc_status_storage = self.key_string_value_json_storage_cls(
227
  namespace="json_doc_status_storage",
228
+ embedding_func=None,
229
  )
230
 
231
  self.llm_response_cache = self.key_string_value_json_storage_cls(
 
267
  embedding_func=self.embedding_func,
268
  )
269
 
270
+ if self.llm_response_cache and hasattr(
271
+ self.llm_response_cache, "global_config"
272
+ ):
273
  hashing_kv = self.llm_response_cache
274
  else:
275
  hashing_kv = self.key_string_value_json_storage_cls(
276
+ namespace="llm_response_cache",
277
+ embedding_func=None,
278
+ )
279
 
280
  self.llm_model_func = limit_async_func_call(self.llm_model_max_async)(
281
  partial(
 
297
  import_path = STORAGES[storage_name]
298
  storage_class = lazy_external_import(import_path, storage_name)
299
  return storage_class
300
+
301
+ def set_storage_client(self, db_client):
302
  # Now only tested on Oracle Database
303
+ for storage in [
304
+ self.vector_db_storage_cls,
305
+ self.graph_storage_cls,
306
+ self.doc_status,
307
+ self.full_docs,
308
+ self.text_chunks,
309
+ self.llm_response_cache,
310
+ self.key_string_value_json_storage_cls,
311
+ self.chunks_vdb,
312
+ self.relationships_vdb,
313
+ self.entities_vdb,
314
+ self.graph_storage_cls,
315
+ self.chunk_entity_relation_graph,
316
+ self.llm_response_cache,
317
+ ]:
318
  # set client
319
  storage.db = db_client
320
 
 
356
  }
357
  for content in unique_contents
358
  }
 
 
 
 
 
359
 
360
  # 3. Filter out already processed documents
361
  _add_doc_keys = await self.doc_status.filter_keys(list(new_docs.keys()))
 
404
  }
405
 
406
  # Update status with chunks information
407
+ doc_status.update(
408
+ {
409
+ "chunks_count": len(chunks),
410
+ "updated_at": datetime.now().isoformat(),
411
+ }
412
+ )
413
  await self.doc_status.upsert({doc_id: doc_status})
414
 
415
  try:
 
433
 
434
  self.chunk_entity_relation_graph = maybe_new_kg
435
 
436
+ # Store original document and chunks
437
+ await self.full_docs.upsert(
438
+ {doc_id: {"content": doc["content"]}}
439
+ )
440
  await self.text_chunks.upsert(chunks)
441
 
442
  # Update status to processed
443
+ doc_status.update(
444
+ {
445
+ "status": DocStatus.PROCESSED,
446
+ "updated_at": datetime.now().isoformat(),
447
+ }
448
+ )
449
  await self.doc_status.upsert({doc_id: doc_status})
450
 
451
  except Exception as e:
452
  # Mark as failed if any step fails
453
+ doc_status.update(
454
+ {
455
+ "status": DocStatus.FAILED,
456
+ "error": str(e),
457
+ "updated_at": datetime.now().isoformat(),
458
+ }
459
+ )
460
  await self.doc_status.upsert({doc_id: doc_status})
461
  raise e
462
 
 
549
  # 1. Remove duplicate contents from the list
550
  unique_contents = list(set(doc.strip() for doc in string_or_strings))
551
 
552
+ logger.info(
553
+ f"Received {len(string_or_strings)} docs, contains {len(unique_contents)} new unique documents"
554
+ )
555
 
556
  # 2. Generate document IDs and initial status
557
  new_docs = {
 
566
  for content in unique_contents
567
  }
568
 
569
+ # 3. Filter out already processed documents
570
  _not_stored_doc_keys = await self.full_docs.filter_keys(list(new_docs.keys()))
571
  if len(_not_stored_doc_keys) < len(new_docs):
572
+ logger.info(
573
+ f"Skipping {len(new_docs)-len(_not_stored_doc_keys)} already existing documents"
574
+ )
575
  new_docs = {k: v for k, v in new_docs.items() if k in _not_stored_doc_keys}
576
 
577
  if not new_docs:
578
+ logger.info("All documents have been processed or are duplicates")
579
  return None
580
 
581
+ # 4. Store original document
582
  for doc_id, doc in new_docs.items():
583
  await self.full_docs.upsert({doc_id: {"content": doc["content"]}})
584
  await self.full_docs.change_status(doc_id, DocStatus.PENDING)
585
  logger.info(f"Stored {len(new_docs)} new unique documents")
586
+
587
  async def apipeline_process_chunks(self):
588
+ """Get pendding documents, split into chunks,insert chunks"""
589
+ # 1. get all pending and failed documents
590
  _todo_doc_keys = []
591
+ _failed_doc = await self.full_docs.get_by_status_and_ids(
592
+ status=DocStatus.FAILED, ids=None
593
+ )
594
+ _pendding_doc = await self.full_docs.get_by_status_and_ids(
595
+ status=DocStatus.PENDING, ids=None
596
+ )
597
  if _failed_doc:
598
  _todo_doc_keys.extend([doc["id"] for doc in _failed_doc])
599
  if _pendding_doc:
 
603
  return None
604
  else:
605
  logger.info(f"Filtered out {len(_todo_doc_keys)} not processed documents")
606
+
607
  new_docs = {
608
+ doc["id"]: doc for doc in await self.full_docs.get_by_ids(_todo_doc_keys)
 
609
  }
610
 
611
  # 2. split docs into chunks, insert chunks, update doc status
 
614
  for i in range(0, len(new_docs), batch_size):
615
  batch_docs = dict(list(new_docs.items())[i : i + batch_size])
616
  for doc_id, doc in tqdm_async(
617
+ batch_docs.items(),
618
+ desc=f"Level 1 - Spliting doc in batch {i//batch_size + 1}",
619
+ ):
620
  try:
621
  # Generate chunks from document
622
  chunks = {
 
646
  await self.full_docs.change_status(doc_id, DocStatus.FAILED)
647
  raise e
648
  except Exception as e:
649
+ import traceback
650
+
651
+ error_msg = f"Failed to process document {doc_id}: {str(e)}\n{traceback.format_exc()}"
652
+ logger.error(error_msg)
653
+ continue
654
+ logger.info(f"Stored {chunk_cnt} chunks from {len(new_docs)} documents")
655
 
656
  async def apipeline_process_extract_graph(self):
657
  """Get pendding or failed chunks, extract entities and relationships from each chunk"""
658
+ # 1. get all pending and failed chunks
659
  _todo_chunk_keys = []
660
+ _failed_chunks = await self.text_chunks.get_by_status_and_ids(
661
+ status=DocStatus.FAILED, ids=None
662
+ )
663
+ _pendding_chunks = await self.text_chunks.get_by_status_and_ids(
664
+ status=DocStatus.PENDING, ids=None
665
+ )
666
  if _failed_chunks:
667
  _todo_chunk_keys.extend([doc["id"] for doc in _failed_chunks])
668
  if _pendding_chunks:
 
670
  if not _todo_chunk_keys:
671
  logger.info("All chunks have been processed or are duplicates")
672
  return None
673
+
674
  # Process documents in batches
675
  batch_size = self.addon_params.get("insert_batch_size", 10)
676
 
677
+ semaphore = asyncio.Semaphore(
678
+ batch_size
679
+ ) # Control the number of tasks that are processed simultaneously
680
 
681
+ async def process_chunk(chunk_id):
682
  async with semaphore:
683
+ chunks = {
684
+ i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id])
685
+ }
686
  # Extract and store entities and relationships
687
  try:
688
  maybe_new_kg = await extract_entities(
 
701
  logger.error("Failed to extract entities and relationships")
702
  # Mark as failed if any step fails
703
  await self.text_chunks.change_status(chunk_id, DocStatus.FAILED)
704
+ raise e
705
+
706
+ with tqdm_async(
707
+ total=len(_todo_chunk_keys),
708
+ desc="\nLevel 1 - Processing chunks",
709
+ unit="chunk",
710
+ position=0,
711
+ ) as progress:
712
  tasks = []
713
  for chunk_id in _todo_chunk_keys:
714
  task = asyncio.create_task(process_chunk(chunk_id))
715
  tasks.append(task)
716
+
717
  for future in asyncio.as_completed(tasks):
718
  await future
719
  progress.update(1)
720
+ progress.set_postfix(
721
+ {
722
+ "LLM call": statistic_data["llm_call"],
723
+ "LLM cache": statistic_data["llm_cache"],
724
+ }
725
+ )
726
+
727
  # Ensure all indexes are updated after each document
728
  await self._insert_done()
729
 
lightrag/operate.py CHANGED
@@ -20,7 +20,7 @@ from .utils import (
20
  handle_cache,
21
  save_to_cache,
22
  CacheData,
23
- statistic_data
24
  )
25
  from .base import (
26
  BaseGraphStorage,
@@ -105,7 +105,9 @@ async def _handle_entity_relation_summary(
105
  llm_max_tokens = global_config["llm_model_max_token_size"]
106
  tiktoken_model_name = global_config["tiktoken_model_name"]
107
  summary_max_tokens = global_config["entity_summary_to_max_tokens"]
108
- language = global_config["addon_params"].get("language", PROMPTS["DEFAULT_LANGUAGE"])
 
 
109
 
110
  tokens = encode_string_by_tiktoken(description, model_name=tiktoken_model_name)
111
  if len(tokens) < summary_max_tokens: # No need for summary
@@ -360,7 +362,7 @@ async def extract_entities(
360
  llm_response_cache.global_config = new_config
361
  need_to_restore = True
362
  if history_messages:
363
- history = json.dumps(history_messages,ensure_ascii=False)
364
  _prompt = history + "\n" + input_text
365
  else:
366
  _prompt = input_text
@@ -381,7 +383,7 @@ async def extract_entities(
381
  input_text, history_messages=history_messages
382
  )
383
  else:
384
- res: str = await use_llm_func(input_text)
385
  await save_to_cache(
386
  llm_response_cache,
387
  CacheData(args_hash=arg_hash, content=res, prompt=_prompt),
@@ -394,7 +396,7 @@ async def extract_entities(
394
  return await use_llm_func(input_text)
395
 
396
  async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
397
- """"Prpocess a single chunk
398
  Args:
399
  chunk_key_dp (tuple[str, TextChunkSchema]):
400
  ("chunck-xxxxxx", {"tokens": int, "content": str, "full_doc_id": str, "chunk_order_index": int})
@@ -472,7 +474,9 @@ async def extract_entities(
472
  asyncio.as_completed([_process_single_content(c) for c in ordered_chunks]),
473
  total=len(ordered_chunks),
474
  desc="Level 2 - Extracting entities and relationships",
475
- unit="chunk", position=1,leave=False
 
 
476
  ):
477
  results.append(await result)
478
 
@@ -494,7 +498,9 @@ async def extract_entities(
494
  ),
495
  total=len(maybe_nodes),
496
  desc="Level 3 - Inserting entities",
497
- unit="entity", position=2,leave=False
 
 
498
  ):
499
  all_entities_data.append(await result)
500
 
@@ -511,7 +517,9 @@ async def extract_entities(
511
  ),
512
  total=len(maybe_edges),
513
  desc="Level 3 - Inserting relationships",
514
- unit="relationship", position=3,leave=False
 
 
515
  ):
516
  all_relationships_data.append(await result)
517
 
 
20
  handle_cache,
21
  save_to_cache,
22
  CacheData,
23
+ statistic_data,
24
  )
25
  from .base import (
26
  BaseGraphStorage,
 
105
  llm_max_tokens = global_config["llm_model_max_token_size"]
106
  tiktoken_model_name = global_config["tiktoken_model_name"]
107
  summary_max_tokens = global_config["entity_summary_to_max_tokens"]
108
+ language = global_config["addon_params"].get(
109
+ "language", PROMPTS["DEFAULT_LANGUAGE"]
110
+ )
111
 
112
  tokens = encode_string_by_tiktoken(description, model_name=tiktoken_model_name)
113
  if len(tokens) < summary_max_tokens: # No need for summary
 
362
  llm_response_cache.global_config = new_config
363
  need_to_restore = True
364
  if history_messages:
365
+ history = json.dumps(history_messages, ensure_ascii=False)
366
  _prompt = history + "\n" + input_text
367
  else:
368
  _prompt = input_text
 
383
  input_text, history_messages=history_messages
384
  )
385
  else:
386
+ res: str = await use_llm_func(input_text)
387
  await save_to_cache(
388
  llm_response_cache,
389
  CacheData(args_hash=arg_hash, content=res, prompt=_prompt),
 
396
  return await use_llm_func(input_text)
397
 
398
  async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
399
+ """ "Prpocess a single chunk
400
  Args:
401
  chunk_key_dp (tuple[str, TextChunkSchema]):
402
  ("chunck-xxxxxx", {"tokens": int, "content": str, "full_doc_id": str, "chunk_order_index": int})
 
474
  asyncio.as_completed([_process_single_content(c) for c in ordered_chunks]),
475
  total=len(ordered_chunks),
476
  desc="Level 2 - Extracting entities and relationships",
477
+ unit="chunk",
478
+ position=1,
479
+ leave=False,
480
  ):
481
  results.append(await result)
482
 
 
498
  ),
499
  total=len(maybe_nodes),
500
  desc="Level 3 - Inserting entities",
501
+ unit="entity",
502
+ position=2,
503
+ leave=False,
504
  ):
505
  all_entities_data.append(await result)
506
 
 
517
  ),
518
  total=len(maybe_edges),
519
  desc="Level 3 - Inserting relationships",
520
+ unit="relationship",
521
+ position=3,
522
+ leave=False,
523
  ):
524
  all_relationships_data.append(await result)
525
 
lightrag/utils.py CHANGED
@@ -41,7 +41,7 @@ logging.getLogger("httpx").setLevel(logging.WARNING)
41
  def set_logger(log_file: str):
42
  logger.setLevel(logging.DEBUG)
43
 
44
- file_handler = logging.FileHandler(log_file, encoding='utf-8')
45
  file_handler.setLevel(logging.DEBUG)
46
 
47
  formatter = logging.Formatter(
@@ -458,7 +458,7 @@ async def handle_cache(hashing_kv, args_hash, prompt, mode="default"):
458
  return None, None, None, None
459
 
460
  # For naive mode, only use simple cache matching
461
- #if mode == "naive":
462
  if mode == "default":
463
  if exists_func(hashing_kv, "get_by_mode_and_id"):
464
  mode_cache = await hashing_kv.get_by_mode_and_id(mode, args_hash) or {}
@@ -479,7 +479,9 @@ async def handle_cache(hashing_kv, args_hash, prompt, mode="default"):
479
  quantized = min_val = max_val = None
480
  if is_embedding_cache_enabled:
481
  # Use embedding cache
482
- embedding_model_func = hashing_kv.global_config["embedding_func"].func #["func"]
 
 
483
  llm_model_func = hashing_kv.global_config.get("llm_model_func")
484
 
485
  current_embedding = await embedding_model_func([prompt])
 
41
  def set_logger(log_file: str):
42
  logger.setLevel(logging.DEBUG)
43
 
44
+ file_handler = logging.FileHandler(log_file, encoding="utf-8")
45
  file_handler.setLevel(logging.DEBUG)
46
 
47
  formatter = logging.Formatter(
 
458
  return None, None, None, None
459
 
460
  # For naive mode, only use simple cache matching
461
+ # if mode == "naive":
462
  if mode == "default":
463
  if exists_func(hashing_kv, "get_by_mode_and_id"):
464
  mode_cache = await hashing_kv.get_by_mode_and_id(mode, args_hash) or {}
 
479
  quantized = min_val = max_val = None
480
  if is_embedding_cache_enabled:
481
  # Use embedding cache
482
+ embedding_model_func = hashing_kv.global_config[
483
+ "embedding_func"
484
+ ].func # ["func"]
485
  llm_model_func = hashing_kv.global_config.get("llm_model_func")
486
 
487
  current_embedding = await embedding_model_func([prompt])