jin commited on
Commit
0fc2402
·
1 Parent(s): 0f601b5

update Oracle support

Browse files

add cache support, fix bug

examples/lightrag_oracle_demo.py CHANGED
@@ -20,7 +20,8 @@ BASE_URL = "http://xxx.xxx.xxx.xxx:8088/v1/"
20
  APIKEY = "ocigenerativeai"
21
  CHATMODEL = "cohere.command-r-plus"
22
  EMBEDMODEL = "cohere.embed-multilingual-v3.0"
23
-
 
24
 
25
  if not os.path.exists(WORKING_DIR):
26
  os.mkdir(WORKING_DIR)
@@ -86,27 +87,49 @@ async def main():
86
  # We use Oracle DB as the KV/vector/graph storage
87
  # You can add `addon_params={"example_number": 1, "language": "Simplfied Chinese"}` to control the prompt
88
  rag = LightRAG(
 
 
 
89
  enable_llm_cache=False,
90
- working_dir=WORKING_DIR,
91
- chunk_token_size=512,
 
 
 
92
  llm_model_func=llm_model_func,
93
  embedding_func=EmbeddingFunc(
94
  embedding_dim=embedding_dimension,
95
- max_token_size=512,
96
  func=embedding_func,
97
- ),
98
- graph_storage="OracleGraphStorage",
99
- kv_storage="OracleKVStorage",
 
100
  vector_storage="OracleVectorDBStorage",
 
 
 
101
  )
102
 
103
- # Setthe KV/vector/graph storage's `db` property, so all operation will use same connection pool
104
- rag.graph_storage_cls.db = oracle_db
105
  rag.key_string_value_json_storage_cls.db = oracle_db
106
  rag.vector_db_storage_cls.db = oracle_db
107
- # add embedding_func for graph database, it's deleted in commit 5661d76860436f7bf5aef2e50d9ee4a59660146c
108
- rag.chunk_entity_relation_graph.embedding_func = rag.embedding_func
 
 
 
 
 
 
 
 
 
 
 
109
 
 
 
110
  # Extract and Insert into LightRAG storage
111
  with open("./dickens/demo.txt", "r", encoding="utf-8") as f:
112
  await rag.ainsert(f.read())
 
20
  APIKEY = "ocigenerativeai"
21
  CHATMODEL = "cohere.command-r-plus"
22
  EMBEDMODEL = "cohere.embed-multilingual-v3.0"
23
+ CHUNK_TOKEN_SIZE = 1024
24
+ MAX_TOKENS = 4000
25
 
26
  if not os.path.exists(WORKING_DIR):
27
  os.mkdir(WORKING_DIR)
 
87
  # We use Oracle DB as the KV/vector/graph storage
88
  # You can add `addon_params={"example_number": 1, "language": "Simplfied Chinese"}` to control the prompt
89
  rag = LightRAG(
90
+ working_dir=WORKING_DIR,
91
+ entity_extract_max_gleaning = 1,
92
+
93
  enable_llm_cache=False,
94
+ embedding_cache_config= None, # {"enabled": True,"similarity_threshold": 0.90},
95
+ enable_llm_cache_for_entity_extract = True,
96
+
97
+ chunk_token_size=CHUNK_TOKEN_SIZE,
98
+ llm_model_max_token_size = MAX_TOKENS,
99
  llm_model_func=llm_model_func,
100
  embedding_func=EmbeddingFunc(
101
  embedding_dim=embedding_dimension,
102
+ max_token_size=500,
103
  func=embedding_func,
104
+ ),
105
+
106
+ graph_storage = "OracleGraphStorage",
107
+ kv_storage = "OracleKVStorage",
108
  vector_storage="OracleVectorDBStorage",
109
+ doc_status_storage="OracleDocStatusStorage",
110
+
111
+ addon_params = {"example_number":1, "language":"Simplfied Chinese"},
112
  )
113
 
114
+ # Setthe KV/vector/graph storage's `db` property, so all operation will use same connection pool
 
115
  rag.key_string_value_json_storage_cls.db = oracle_db
116
  rag.vector_db_storage_cls.db = oracle_db
117
+ rag.graph_storage_cls.db = oracle_db
118
+ rag.doc_status_storage_cls.db = oracle_db
119
+ rag.doc_status.db = oracle_db
120
+ rag.full_docs.db = oracle_db
121
+ rag.text_chunks.db = oracle_db
122
+ rag.llm_response_cache.db = oracle_db
123
+ rag.key_string_value_json_storage_cls.db = oracle_db
124
+ rag.chunks_vdb.db = oracle_db
125
+ rag.relationships_vdb.db = oracle_db
126
+ rag.entities_vdb.db = oracle_db
127
+ rag.graph_storage_cls.db = oracle_db
128
+ rag.chunk_entity_relation_graph.db = oracle_db
129
+ rag.llm_response_cache.db = oracle_db
130
 
131
+ rag.chunk_entity_relation_graph.embedding_func = rag.embedding_func
132
+
133
  # Extract and Insert into LightRAG storage
134
  with open("./dickens/demo.txt", "r", encoding="utf-8") as f:
135
  await rag.ainsert(f.read())
lightrag/kg/oracle_impl.py CHANGED
@@ -3,7 +3,7 @@ import asyncio
3
  # import html
4
  # import os
5
  from dataclasses import dataclass
6
- from typing import Union
7
  import numpy as np
8
  import array
9
 
@@ -12,6 +12,9 @@ from ..base import (
12
  BaseGraphStorage,
13
  BaseKVStorage,
14
  BaseVectorStorage,
 
 
 
15
  )
16
 
17
  import oracledb
@@ -167,6 +170,9 @@ class OracleDB:
167
  @dataclass
168
  class OracleKVStorage(BaseKVStorage):
169
  # should pass db object to self.db
 
 
 
170
  def __post_init__(self):
171
  self._data = {}
172
  self._max_batch_size = self.global_config["embedding_batch_num"]
@@ -174,28 +180,56 @@ class OracleKVStorage(BaseKVStorage):
174
  ################ QUERY METHODS ################
175
 
176
  async def get_by_id(self, id: str) -> Union[dict, None]:
177
- """根据 id 获取 doc_full 数据."""
178
  SQL = SQL_TEMPLATES["get_by_id_" + self.namespace]
179
  params = {"workspace": self.db.workspace, "id": id}
180
  # print("get_by_id:"+SQL)
181
- res = await self.db.query(SQL, params)
 
 
 
 
 
 
182
  if res:
183
- data = res # {"data":res}
184
- # print (data)
185
- return data
 
 
 
 
 
 
 
 
 
 
 
186
  else:
187
  return None
188
 
189
  # Query by id
190
  async def get_by_ids(self, ids: list[str], fields=None) -> Union[list[dict], None]:
191
- """根据 id 获取 doc_chunks 数据"""
192
  SQL = SQL_TEMPLATES["get_by_ids_" + self.namespace].format(
193
  ids=",".join([f"'{id}'" for id in ids])
194
  )
195
  params = {"workspace": self.db.workspace}
196
  # print("get_by_ids:"+SQL)
197
- # print(params)
198
  res = await self.db.query(SQL, params, multirows=True)
 
 
 
 
 
 
 
 
 
 
 
 
199
  if res:
200
  data = res # [{"data":i} for i in res]
201
  # print(data)
@@ -204,7 +238,7 @@ class OracleKVStorage(BaseKVStorage):
204
  return None
205
 
206
  async def filter_keys(self, keys: list[str]) -> set[str]:
207
- """过滤掉重复内容"""
208
  SQL = SQL_TEMPLATES["filter_keys"].format(
209
  table_name=N_T[self.namespace], ids=",".join([f"'{id}'" for id in keys])
210
  )
@@ -271,13 +305,26 @@ class OracleKVStorage(BaseKVStorage):
271
  # values.clear()
272
  merge_sql = SQL_TEMPLATES["merge_doc_full"]
273
  data = {
274
- "check_id": k,
275
  "id": k,
276
  "content": v["content"],
277
  "workspace": self.db.workspace,
278
  }
279
  # print(merge_sql)
280
  await self.db.execute(merge_sql, data)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
281
  return left_data
282
 
283
  async def index_done_callback(self):
@@ -285,8 +332,99 @@ class OracleKVStorage(BaseKVStorage):
285
  logger.info("full doc and chunk data had been saved into oracle db!")
286
 
287
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
288
  @dataclass
289
  class OracleVectorDBStorage(BaseVectorStorage):
 
 
290
  cosine_better_than_threshold: float = 0.2
291
 
292
  def __post_init__(self):
@@ -564,13 +702,18 @@ N_T = {
564
  TABLES = {
565
  "LIGHTRAG_DOC_FULL": {
566
  "ddl": """CREATE TABLE LIGHTRAG_DOC_FULL (
567
- id varchar(256)PRIMARY KEY,
568
  workspace varchar(1024),
569
  doc_name varchar(1024),
570
  content CLOB,
571
  meta JSON,
 
 
 
 
572
  createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
573
- updatetime TIMESTAMP DEFAULT NULL
 
574
  )"""
575
  },
576
  "LIGHTRAG_DOC_CHUNKS": {
@@ -618,10 +761,16 @@ TABLES = {
618
  },
619
  "LIGHTRAG_LLM_CACHE": {
620
  "ddl": """CREATE TABLE LIGHTRAG_LLM_CACHE (
621
- id varchar(256) PRIMARY KEY,
622
- send clob,
623
- return clob,
624
- model varchar(1024),
 
 
 
 
 
 
625
  createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
626
  updatetime TIMESTAMP DEFAULT NULL
627
  )"""
@@ -647,22 +796,70 @@ TABLES = {
647
  SQL_TEMPLATES = {
648
  # SQL for KVStorage
649
  "get_by_id_full_docs": "select ID,NVL(content,'') as content from LIGHTRAG_DOC_FULL where workspace=:workspace and ID=:id",
 
650
  "get_by_id_text_chunks": "select ID,TOKENS,NVL(content,'') as content,CHUNK_ORDER_INDEX,FULL_DOC_ID from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and ID=:id",
 
 
 
 
 
 
 
 
 
 
651
  "get_by_ids_full_docs": "select ID,NVL(content,'') as content from LIGHTRAG_DOC_FULL where workspace=:workspace and ID in ({ids})",
 
652
  "get_by_ids_text_chunks": "select ID,TOKENS,NVL(content,'') as content,CHUNK_ORDER_INDEX,FULL_DOC_ID from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and ID in ({ids})",
 
653
  "filter_keys": "select id from {table_name} where workspace=:workspace and id in ({ids})",
654
- "merge_doc_full": """ MERGE INTO LIGHTRAG_DOC_FULL a
655
- USING DUAL
656
- ON (a.id = :check_id)
657
- WHEN NOT MATCHED THEN
658
- INSERT(id,content,workspace) values(:id,:content,:workspace)
659
- """,
 
660
  "merge_chunk": """MERGE INTO LIGHTRAG_DOC_CHUNKS a
661
- USING DUAL
662
- ON (a.id = :check_id)
663
- WHEN NOT MATCHED THEN
664
- INSERT(id,content,workspace,tokens,chunk_order_index,full_doc_id,content_vector)
665
- values (:id,:content,:workspace,:tokens,:chunk_order_index,:full_doc_id,:content_vector) """,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
666
  # SQL for VectorStorage
667
  "entities": """SELECT name as entity_name FROM
668
  (SELECT id,name,VECTOR_DISTANCE(content_vector,vector(:embedding_string,{dimension},{dtype}),COSINE) as distance
@@ -714,16 +911,22 @@ SQL_TEMPLATES = {
714
  COLUMNS (a.name as source_name,b.name as target_name))""",
715
  "merge_node": """MERGE INTO LIGHTRAG_GRAPH_NODES a
716
  USING DUAL
717
- ON (a.workspace = :workspace and a.name=:name and a.source_chunk_id=:source_chunk_id)
718
  WHEN NOT MATCHED THEN
719
  INSERT(workspace,name,entity_type,description,source_chunk_id,content,content_vector)
720
- values (:workspace,:name,:entity_type,:description,:source_chunk_id,:content,:content_vector) """,
 
 
 
721
  "merge_edge": """MERGE INTO LIGHTRAG_GRAPH_EDGES a
722
  USING DUAL
723
- ON (a.workspace = :workspace and a.source_name=:source_name and a.target_name=:target_name and a.source_chunk_id=:source_chunk_id)
724
  WHEN NOT MATCHED THEN
725
  INSERT(workspace,source_name,target_name,weight,keywords,description,source_chunk_id,content,content_vector)
726
- values (:workspace,:source_name,:target_name,:weight,:keywords,:description,:source_chunk_id,:content,:content_vector) """,
 
 
 
727
  "get_all_nodes": """WITH t0 AS (
728
  SELECT name AS id, entity_type AS label, entity_type, description,
729
  '["' || replace(source_chunk_id, '<SEP>', '","') || '"]' source_chunk_ids
 
3
  # import html
4
  # import os
5
  from dataclasses import dataclass
6
+ from typing import Union, List, Dict, Set, Any, Tuple
7
  import numpy as np
8
  import array
9
 
 
12
  BaseGraphStorage,
13
  BaseKVStorage,
14
  BaseVectorStorage,
15
+ DocStatusStorage,
16
+ DocStatus,
17
+ DocProcessingStatus,
18
  )
19
 
20
  import oracledb
 
170
  @dataclass
171
  class OracleKVStorage(BaseKVStorage):
172
  # should pass db object to self.db
173
+ db: OracleDB = None
174
+ meta_fields = None
175
+
176
  def __post_init__(self):
177
  self._data = {}
178
  self._max_batch_size = self.global_config["embedding_batch_num"]
 
180
  ################ QUERY METHODS ################
181
 
182
  async def get_by_id(self, id: str) -> Union[dict, None]:
183
+ """get doc_full data based on id."""
184
  SQL = SQL_TEMPLATES["get_by_id_" + self.namespace]
185
  params = {"workspace": self.db.workspace, "id": id}
186
  # print("get_by_id:"+SQL)
187
+ if "llm_response_cache" == self.namespace:
188
+ array_res = await self.db.query(SQL, params, multirows=True)
189
+ res = {}
190
+ for row in array_res:
191
+ res[row["id"]] = row
192
+ else:
193
+ res = await self.db.query(SQL, params)
194
  if res:
195
+ return res
196
+ else:
197
+ return None
198
+
199
+ async def get_by_mode_and_id(self, mode: str, id: str) -> Union[dict, None]:
200
+ """Specifically for llm_response_cache."""
201
+ SQL = SQL_TEMPLATES["get_by_mode_id_" + self.namespace]
202
+ params = {"workspace": self.db.workspace, "cache_mode": mode, "id": id}
203
+ if "llm_response_cache" == self.namespace:
204
+ array_res = await self.db.query(SQL, params, multirows=True)
205
+ res = {}
206
+ for row in array_res:
207
+ res[row["id"]] = row
208
+ return res
209
  else:
210
  return None
211
 
212
  # Query by id
213
  async def get_by_ids(self, ids: list[str], fields=None) -> Union[list[dict], None]:
214
+ """get doc_chunks data based on id"""
215
  SQL = SQL_TEMPLATES["get_by_ids_" + self.namespace].format(
216
  ids=",".join([f"'{id}'" for id in ids])
217
  )
218
  params = {"workspace": self.db.workspace}
219
  # print("get_by_ids:"+SQL)
 
220
  res = await self.db.query(SQL, params, multirows=True)
221
+ if "llm_response_cache" == self.namespace:
222
+ modes = set()
223
+ dict_res: dict[str, dict] = {}
224
+ for row in res:
225
+ modes.add(row["mode"])
226
+ for mode in modes:
227
+ if mode not in dict_res:
228
+ dict_res[mode] = {}
229
+ for row in res:
230
+ dict_res[row["mode"]][row["id"]] = row
231
+ res = [{k: v} for k, v in dict_res.items()]
232
+
233
  if res:
234
  data = res # [{"data":i} for i in res]
235
  # print(data)
 
238
  return None
239
 
240
  async def filter_keys(self, keys: list[str]) -> set[str]:
241
+ """remove duplicated"""
242
  SQL = SQL_TEMPLATES["filter_keys"].format(
243
  table_name=N_T[self.namespace], ids=",".join([f"'{id}'" for id in keys])
244
  )
 
305
  # values.clear()
306
  merge_sql = SQL_TEMPLATES["merge_doc_full"]
307
  data = {
 
308
  "id": k,
309
  "content": v["content"],
310
  "workspace": self.db.workspace,
311
  }
312
  # print(merge_sql)
313
  await self.db.execute(merge_sql, data)
314
+
315
+ if self.namespace == "llm_response_cache":
316
+ for mode, items in data.items():
317
+ for k, v in items.items():
318
+ upsert_sql = SQL_TEMPLATES["upsert_llm_response_cache"]
319
+ _data = {
320
+ "workspace": self.db.workspace,
321
+ "id": k,
322
+ "original_prompt": v["original_prompt"],
323
+ "return_value": v["return"],
324
+ "cache_mode": mode,
325
+ }
326
+
327
+ await self.db.execute(upsert_sql, _data)
328
  return left_data
329
 
330
  async def index_done_callback(self):
 
332
  logger.info("full doc and chunk data had been saved into oracle db!")
333
 
334
 
335
+ @dataclass
336
+ class OracleDocStatusStorage(DocStatusStorage):
337
+ """Oracle implementation of document status storage"""
338
+ # should pass db object to self.db
339
+ db: OracleDB = None
340
+ meta_fields = None
341
+
342
+ def __post_init__(self):
343
+ pass
344
+
345
+ async def filter_keys(self, ids: list[str]) -> set[str]:
346
+ """Return keys that don't exist in storage"""
347
+ SQL = SQL_TEMPLATES["get_by_id_" + self.namespace].format(
348
+ ids = ",".join([f"'{id}'" for id in ids])
349
+ )
350
+ params = {"workspace": self.db.workspace}
351
+ res = await self.db.query(SQL, params, True)
352
+ # The result is like [{'id': 'id1'}, {'id': 'id2'}, ...].
353
+ if res:
354
+ existed = set([element["id"] for element in res])
355
+ return set(ids) - existed
356
+ else:
357
+ return set(ids)
358
+
359
+ async def get_status_counts(self) -> Dict[str, int]:
360
+ """Get counts of documents in each status"""
361
+ SQL = SQL_TEMPLATES["get_status_counts"]
362
+ params = {"workspace": self.db.workspace}
363
+ res = await self.db.query(SQL, params, True)
364
+ # Result is like [{'status': 'PENDING', 'count': 1}, {'status': 'PROCESSING', 'count': 2}, ...]
365
+ counts = {}
366
+ for doc in res:
367
+ counts[doc["status"]] = doc["count"]
368
+ return counts
369
+
370
+ async def get_docs_by_status(self, status: DocStatus) -> Dict[str, DocProcessingStatus]:
371
+ """Get all documents by status"""
372
+ SQL = SQL_TEMPLATES["get_docs_by_status"]
373
+ params = {"workspace": self.db.workspace, "status": status}
374
+ res = await self.db.query(SQL, params, True)
375
+ # Result is like [{'id': 'id1', 'status': 'PENDING', 'updated_at': '2023-07-01 00:00:00'}, {'id': 'id2', 'status': 'PENDING', 'updated_at': '2023-07-01 00:00:00'}, ...]
376
+ # Converting to be a dict
377
+ return {
378
+ element["id"]: DocProcessingStatus(
379
+ #content_summary=element["content_summary"],
380
+ content_summary = "",
381
+ content_length=element["CONTENT_LENGTH"],
382
+ status=element["STATUS"],
383
+ created_at=element["CREATETIME"],
384
+ updated_at=element["UPDATETIME"],
385
+ chunks_count=-1,
386
+ #chunks_count=element["chunks_count"],
387
+ )
388
+ for element in res
389
+ }
390
+
391
+ async def get_failed_docs(self) -> Dict[str, DocProcessingStatus]:
392
+ """Get all failed documents"""
393
+ return await self.get_docs_by_status(DocStatus.FAILED)
394
+
395
+ async def get_pending_docs(self) -> Dict[str, DocProcessingStatus]:
396
+ """Get all pending documents"""
397
+ return await self.get_docs_by_status(DocStatus.PENDING)
398
+
399
+ async def index_done_callback(self):
400
+ """Save data after indexing, but for ORACLE, we already saved them during the upsert stage, so no action to take here"""
401
+ logger.info("Doc status had been saved into ORACLE db!")
402
+
403
+ async def upsert(self, data: dict[str, dict]):
404
+ """Update or insert document status
405
+
406
+ Args:
407
+ data: Dictionary of document IDs and their status data
408
+ """
409
+ SQL = SQL_TEMPLATES["merge_doc_status"]
410
+ for k, v in data.items():
411
+ # chunks_count is optional
412
+ params = {
413
+ "workspace": self.db.workspace,
414
+ "id": k,
415
+ "content_summary": v["content_summary"],
416
+ "content_length": v["content_length"],
417
+ "chunks_count": v["chunks_count"] if "chunks_count" in v else -1,
418
+ "status": v["status"],
419
+ }
420
+ await self.db.execute(SQL, params)
421
+ return data
422
+
423
+
424
  @dataclass
425
  class OracleVectorDBStorage(BaseVectorStorage):
426
+ # should pass db object to self.db
427
+ db: OracleDB = None
428
  cosine_better_than_threshold: float = 0.2
429
 
430
  def __post_init__(self):
 
702
  TABLES = {
703
  "LIGHTRAG_DOC_FULL": {
704
  "ddl": """CREATE TABLE LIGHTRAG_DOC_FULL (
705
+ id varchar(256),
706
  workspace varchar(1024),
707
  doc_name varchar(1024),
708
  content CLOB,
709
  meta JSON,
710
+ content_summary varchar(1024),
711
+ content_length NUMBER,
712
+ status varchar(256),
713
+ chunks_count NUMBER,
714
  createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
715
+ updatetime TIMESTAMP DEFAULT NULL,
716
+ error varchar(4096)
717
  )"""
718
  },
719
  "LIGHTRAG_DOC_CHUNKS": {
 
761
  },
762
  "LIGHTRAG_LLM_CACHE": {
763
  "ddl": """CREATE TABLE LIGHTRAG_LLM_CACHE (
764
+ id varchar(256) PRIMARY KEY,
765
+ workspace varchar(1024),
766
+ cache_mode varchar(256),
767
+ model_name varchar(256),
768
+ original_prompt clob,
769
+ return_value clob,
770
+ embedding CLOB,
771
+ embedding_shape NUMBER,
772
+ embedding_min NUMBER,
773
+ embedding_max NUMBER,
774
  createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
775
  updatetime TIMESTAMP DEFAULT NULL
776
  )"""
 
796
  SQL_TEMPLATES = {
797
  # SQL for KVStorage
798
  "get_by_id_full_docs": "select ID,NVL(content,'') as content from LIGHTRAG_DOC_FULL where workspace=:workspace and ID=:id",
799
+
800
  "get_by_id_text_chunks": "select ID,TOKENS,NVL(content,'') as content,CHUNK_ORDER_INDEX,FULL_DOC_ID from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and ID=:id",
801
+
802
+ "get_by_id_llm_response_cache": """SELECT id, original_prompt, NVL(return_value, '') as "return", cache_mode as "mode"
803
+ FROM LIGHTRAG_LLM_CACHE WHERE workspace=:workspace AND id=:id""",
804
+
805
+ "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, NVL(return_value, '') as "return", cache_mode as "mode"
806
+ FROM LIGHTRAG_LLM_CACHE WHERE workspace=:workspace AND cache_mode=:cache_mode AND id=:id""",
807
+
808
+ "get_by_ids_llm_response_cache": """SELECT id, original_prompt, NVL(return_value, '') as "return", cache_mode as "mode"
809
+ FROM LIGHTRAG_LLM_CACHE WHERE workspace=:workspace AND id IN ({ids})""",
810
+
811
  "get_by_ids_full_docs": "select ID,NVL(content,'') as content from LIGHTRAG_DOC_FULL where workspace=:workspace and ID in ({ids})",
812
+
813
  "get_by_ids_text_chunks": "select ID,TOKENS,NVL(content,'') as content,CHUNK_ORDER_INDEX,FULL_DOC_ID from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and ID in ({ids})",
814
+
815
  "filter_keys": "select id from {table_name} where workspace=:workspace and id in ({ids})",
816
+
817
+ "merge_doc_full": """MERGE INTO LIGHTRAG_DOC_FULL a
818
+ USING DUAL
819
+ ON (a.id = :id and a.workspace = :workspace)
820
+ WHEN NOT MATCHED THEN
821
+ INSERT(id,content,workspace) values(:id,:content,:workspace)""",
822
+
823
  "merge_chunk": """MERGE INTO LIGHTRAG_DOC_CHUNKS a
824
+ USING DUAL
825
+ ON (a.id = :check_id)
826
+ WHEN NOT MATCHED THEN
827
+ INSERT(id,content,workspace,tokens,chunk_order_index,full_doc_id,content_vector)
828
+ values (:id,:content,:workspace,:tokens,:chunk_order_index,:full_doc_id,:content_vector) """,
829
+
830
+ "upsert_llm_response_cache": """MERGE INTO LIGHTRAG_LLM_CACHE a
831
+ USING DUAL
832
+ ON (a.id = :id)
833
+ WHEN NOT MATCHED THEN
834
+ INSERT (workspace,id,original_prompt,return_value,cache_mode)
835
+ VALUES (:workspace,:id,:original_prompt,:return_value,:cache_mode)
836
+ WHEN MATCHED THEN UPDATE
837
+ SET original_prompt = :original_prompt,
838
+ return_value = :return_value,
839
+ cache_mode = :cache_mode,
840
+ updatetime = SYSDATE""",
841
+
842
+ "get_by_id_doc_status": "SELECT id FROM LIGHTRAG_DOC_FULL WHERE workspace=:workspace AND id IN ({ids})",
843
+
844
+ "get_status_counts": """SELECT status as "status", COUNT(1) as "count"
845
+ FROM LIGHTRAG_DOC_FULL WHERE workspace=:workspace GROUP BY STATUS""",
846
+
847
+ "get_docs_by_status": """select content_length,status,
848
+ TO_CHAR(created_at,'YYYY-MM-DD HH24:MI:SS') as created_at,TO_CHAR(updatetime,'YYYY-MM-DD HH24:MI:SS') as updatetime
849
+ from LIGHTRAG_DOC_STATUS where workspace=:workspace and status=:status""",
850
+
851
+ "merge_doc_status":"""MERGE INTO LIGHTRAG_DOC_FULL a
852
+ USING DUAL
853
+ ON (a.id = :id and a.workspace = :workspace)
854
+ WHEN NOT MATCHED THEN
855
+ INSERT (id,content_summary,content_length,chunks_count,status) values(:id,:content_summary,:content_length,:chunks_count,:status)
856
+ WHEN MATCHED THEN UPDATE
857
+ SET content_summary = :content_summary,
858
+ content_length = :content_length,
859
+ chunks_count = :chunks_count,
860
+ status = :status,
861
+ updatetime = SYSDATE""",
862
+
863
  # SQL for VectorStorage
864
  "entities": """SELECT name as entity_name FROM
865
  (SELECT id,name,VECTOR_DISTANCE(content_vector,vector(:embedding_string,{dimension},{dtype}),COSINE) as distance
 
911
  COLUMNS (a.name as source_name,b.name as target_name))""",
912
  "merge_node": """MERGE INTO LIGHTRAG_GRAPH_NODES a
913
  USING DUAL
914
+ ON (a.workspace=:workspace and a.name=:name)
915
  WHEN NOT MATCHED THEN
916
  INSERT(workspace,name,entity_type,description,source_chunk_id,content,content_vector)
917
+ values (:workspace,:name,:entity_type,:description,:source_chunk_id,:content,:content_vector)
918
+ WHEN MATCHED THEN
919
+ UPDATE SET
920
+ entity_type=:entity_type,description=:description,source_chunk_id=:source_chunk_id,content=:content,content_vector=:content_vector,updatetime=SYSDATE""",
921
  "merge_edge": """MERGE INTO LIGHTRAG_GRAPH_EDGES a
922
  USING DUAL
923
+ ON (a.workspace=:workspace and a.source_name=:source_name and a.target_name=:target_name)
924
  WHEN NOT MATCHED THEN
925
  INSERT(workspace,source_name,target_name,weight,keywords,description,source_chunk_id,content,content_vector)
926
+ values (:workspace,:source_name,:target_name,:weight,:keywords,:description,:source_chunk_id,:content,:content_vector)
927
+ WHEN MATCHED THEN
928
+ UPDATE SET
929
+ weight=:weight,keywords=:keywords,description=:description,source_chunk_id=:source_chunk_id,content=:content,content_vector=:content_vector,updatetime=SYSDATE""",
930
  "get_all_nodes": """WITH t0 AS (
931
  SELECT name AS id, entity_type AS label, entity_type, description,
932
  '["' || replace(source_chunk_id, '<SEP>', '","') || '"]' source_chunk_ids
lightrag/lightrag.py CHANGED
@@ -79,6 +79,7 @@ Neo4JStorage = lazy_external_import(".kg.neo4j_impl", "Neo4JStorage")
79
  OracleKVStorage = lazy_external_import(".kg.oracle_impl", "OracleKVStorage")
80
  OracleGraphStorage = lazy_external_import(".kg.oracle_impl", "OracleGraphStorage")
81
  OracleVectorDBStorage = lazy_external_import(".kg.oracle_impl", "OracleVectorDBStorage")
 
82
  MilvusVectorDBStorge = lazy_external_import(".kg.milvus_impl", "MilvusVectorDBStorge")
83
  MongoKVStorage = lazy_external_import(".kg.mongo_impl", "MongoKVStorage")
84
  ChromaVectorDBStorage = lazy_external_import(".kg.chroma_impl", "ChromaVectorDBStorage")
@@ -290,6 +291,7 @@ class LightRAG:
290
  # kv storage
291
  "JsonKVStorage": JsonKVStorage,
292
  "OracleKVStorage": OracleKVStorage,
 
293
  "MongoKVStorage": MongoKVStorage,
294
  "TiDBKVStorage": TiDBKVStorage,
295
  # vector storage
 
79
  OracleKVStorage = lazy_external_import(".kg.oracle_impl", "OracleKVStorage")
80
  OracleGraphStorage = lazy_external_import(".kg.oracle_impl", "OracleGraphStorage")
81
  OracleVectorDBStorage = lazy_external_import(".kg.oracle_impl", "OracleVectorDBStorage")
82
+ OracleDocStatusStorage = lazy_external_import(".kg.oracle_impl", "OracleDocStatusStorage")
83
  MilvusVectorDBStorge = lazy_external_import(".kg.milvus_impl", "MilvusVectorDBStorge")
84
  MongoKVStorage = lazy_external_import(".kg.mongo_impl", "MongoKVStorage")
85
  ChromaVectorDBStorage = lazy_external_import(".kg.chroma_impl", "ChromaVectorDBStorage")
 
291
  # kv storage
292
  "JsonKVStorage": JsonKVStorage,
293
  "OracleKVStorage": OracleKVStorage,
294
+ "OracleDocStatusStorage":OracleDocStatusStorage,
295
  "MongoKVStorage": MongoKVStorage,
296
  "TiDBKVStorage": TiDBKVStorage,
297
  # vector storage
lightrag/operate.py CHANGED
@@ -59,13 +59,15 @@ async def _handle_entity_relation_summary(
59
  description: str,
60
  global_config: dict,
61
  ) -> str:
 
 
 
 
62
  use_llm_func: callable = global_config["llm_model_func"]
63
  llm_max_tokens = global_config["llm_model_max_token_size"]
64
  tiktoken_model_name = global_config["tiktoken_model_name"]
65
  summary_max_tokens = global_config["entity_summary_to_max_tokens"]
66
- language = global_config["addon_params"].get(
67
- "language", PROMPTS["DEFAULT_LANGUAGE"]
68
- )
69
 
70
  tokens = encode_string_by_tiktoken(description, model_name=tiktoken_model_name)
71
  if len(tokens) < summary_max_tokens: # No need for summary
@@ -139,6 +141,7 @@ async def _merge_nodes_then_upsert(
139
  knowledge_graph_inst: BaseGraphStorage,
140
  global_config: dict,
141
  ):
 
142
  already_entity_types = []
143
  already_source_ids = []
144
  already_description = []
@@ -319,7 +322,7 @@ async def extract_entities(
319
  llm_response_cache.global_config = new_config
320
  need_to_restore = True
321
  if history_messages:
322
- history = json.dumps(history_messages)
323
  _prompt = history + "\n" + input_text
324
  else:
325
  _prompt = input_text
@@ -351,6 +354,11 @@ async def extract_entities(
351
  return await use_llm_func(input_text)
352
 
353
  async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
 
 
 
 
 
354
  nonlocal already_processed, already_entities, already_relations
355
  chunk_key = chunk_key_dp[0]
356
  chunk_dp = chunk_key_dp[1]
 
59
  description: str,
60
  global_config: dict,
61
  ) -> str:
62
+ """Handle entity relation summary
63
+ For each entity or relation, input is the combined description of already existing description and new description.
64
+ If too long, use LLM to summarize.
65
+ """
66
  use_llm_func: callable = global_config["llm_model_func"]
67
  llm_max_tokens = global_config["llm_model_max_token_size"]
68
  tiktoken_model_name = global_config["tiktoken_model_name"]
69
  summary_max_tokens = global_config["entity_summary_to_max_tokens"]
70
+ language = global_config["addon_params"].get("language", PROMPTS["DEFAULT_LANGUAGE"])
 
 
71
 
72
  tokens = encode_string_by_tiktoken(description, model_name=tiktoken_model_name)
73
  if len(tokens) < summary_max_tokens: # No need for summary
 
141
  knowledge_graph_inst: BaseGraphStorage,
142
  global_config: dict,
143
  ):
144
+ """Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert."""
145
  already_entity_types = []
146
  already_source_ids = []
147
  already_description = []
 
322
  llm_response_cache.global_config = new_config
323
  need_to_restore = True
324
  if history_messages:
325
+ history = json.dumps(history_messages,ensure_ascii=False)
326
  _prompt = history + "\n" + input_text
327
  else:
328
  _prompt = input_text
 
354
  return await use_llm_func(input_text)
355
 
356
  async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
357
+ """"Prpocess a single chunk
358
+ Args:
359
+ chunk_key_dp (tuple[str, TextChunkSchema]):
360
+ ("chunck-xxxxxx", {"tokens": int, "content": str, "full_doc_id": str, "chunk_order_index": int})
361
+ """
362
  nonlocal already_processed, already_entities, already_relations
363
  chunk_key = chunk_key_dp[0]
364
  chunk_dp = chunk_key_dp[1]
lightrag/utils.py CHANGED
@@ -36,7 +36,7 @@ logger = logging.getLogger("lightrag")
36
  def set_logger(log_file: str):
37
  logger.setLevel(logging.DEBUG)
38
 
39
- file_handler = logging.FileHandler(log_file)
40
  file_handler.setLevel(logging.DEBUG)
41
 
42
  formatter = logging.Formatter(
@@ -473,7 +473,7 @@ async def handle_cache(hashing_kv, args_hash, prompt, mode="default"):
473
  quantized = min_val = max_val = None
474
  if is_embedding_cache_enabled:
475
  # Use embedding cache
476
- embedding_model_func = hashing_kv.global_config["embedding_func"]["func"]
477
  llm_model_func = hashing_kv.global_config.get("llm_model_func")
478
 
479
  current_embedding = await embedding_model_func([prompt])
 
36
  def set_logger(log_file: str):
37
  logger.setLevel(logging.DEBUG)
38
 
39
+ file_handler = logging.FileHandler(log_file, encoding='utf-8')
40
  file_handler.setLevel(logging.DEBUG)
41
 
42
  formatter = logging.Formatter(
 
473
  quantized = min_val = max_val = None
474
  if is_embedding_cache_enabled:
475
  # Use embedding cache
476
+ embedding_model_func = hashing_kv.global_config["embedding_func"].func #["func"]
477
  llm_model_func = hashing_kv.global_config.get("llm_model_func")
478
 
479
  current_embedding = await embedding_model_func([prompt])