YanSte commited on
Commit
c799a26
·
unverified ·
2 Parent(s): 9725086 ef0be73

Merge pull request #837 from ParisNeo/main

Browse files

Fixes and Enhancements for PostgreSQL and JSON Document Storage

lightrag/api/docs/LightRagWithPostGRESQL.md CHANGED
@@ -130,7 +130,7 @@ Replace placeholders like `your_role_name`, `your_password`, and `your_database`
130
  Start the LightRAG server using specified options:
131
 
132
  ```bash
133
- lightrag-server --port 9626 --key sk-SL1 --kv-storage PGKVStorage --graph-storage PGGraphStorage --vector-storage PGVectorStorage --doc-status-storage PGDocStatusStorage
134
  ```
135
 
136
  Replace `the-port-number` with your desired port number (default is 9621) and `your-secret-key` with a secure key.
 
130
  Start the LightRAG server using specified options:
131
 
132
  ```bash
133
+ lightrag-server --port 9621 --key sk-somepassword --kv-storage PGKVStorage --graph-storage PGGraphStorage --vector-storage PGVectorStorage --doc-status-storage PGDocStatusStorage
134
  ```
135
 
136
  Replace `the-port-number` with your desired port number (default is 9621) and `your-secret-key` with a secure key.
lightrag/kg/json_doc_status_impl.py CHANGED
@@ -68,3 +68,7 @@ class JsonDocStatusStorage(DocStatusStorage):
68
  for doc_id in doc_ids:
69
  self._data.pop(doc_id, None)
70
  await self.index_done_callback()
 
 
 
 
 
68
  for doc_id in doc_ids:
69
  self._data.pop(doc_id, None)
70
  await self.index_done_callback()
71
+
72
+ async def drop(self) -> None:
73
+ """Drop the storage"""
74
+ self._data.clear()
lightrag/kg/postgres_impl.py CHANGED
@@ -263,8 +263,8 @@ class PGKVStorage(BaseKVStorage):
263
  exist_keys = [key["id"] for key in res]
264
  else:
265
  exist_keys = []
266
- data = set([s for s in keys if s not in exist_keys])
267
- return data
268
  except Exception as e:
269
  logger.error(f"PostgreSQL database error: {e}")
270
  print(sql)
@@ -301,6 +301,11 @@ class PGKVStorage(BaseKVStorage):
301
  # PG handles persistence automatically
302
  pass
303
 
 
 
 
 
 
304
 
305
  @final
306
  @dataclass
@@ -432,16 +437,26 @@ class PGVectorStorage(BaseVectorStorage):
432
  @dataclass
433
  class PGDocStatusStorage(DocStatusStorage):
434
  async def filter_keys(self, keys: set[str]) -> set[str]:
435
- """Return keys that don't exist in storage"""
436
- keys = ",".join([f"'{_id}'" for _id in keys])
437
- sql = f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace='{self.db.workspace}' AND id IN ({keys})"
438
- result = await self.db.query(sql, multirows=True)
439
- # The result is like [{'id': 'id1'}, {'id': 'id2'}, ...].
440
- if result is None:
441
- return set(keys)
442
- else:
443
- existed = set([element["id"] for element in result])
444
- return set(keys) - existed
 
 
 
 
 
 
 
 
 
 
445
 
446
  async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
447
  sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and id=$2"
@@ -483,7 +498,7 @@ class PGDocStatusStorage(DocStatusStorage):
483
  sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
484
  params = {"workspace": self.db.workspace, "status": status.value}
485
  result = await self.db.query(sql, params, True)
486
- return {
487
  element["id"]: DocProcessingStatus(
488
  content=result[0]["content"],
489
  content_summary=element["content_summary"],
@@ -495,6 +510,7 @@ class PGDocStatusStorage(DocStatusStorage):
495
  )
496
  for element in result
497
  }
 
498
 
499
  async def index_done_callback(self) -> None:
500
  # PG handles persistence automatically
@@ -531,6 +547,11 @@ class PGDocStatusStorage(DocStatusStorage):
531
  )
532
  return data
533
 
 
 
 
 
 
534
 
535
  class PGGraphQueryException(Exception):
536
  """Exception for the AGE queries."""
@@ -1012,6 +1033,13 @@ class PGGraphStorage(BaseGraphStorage):
1012
  ) -> KnowledgeGraph:
1013
  raise NotImplementedError
1014
 
 
 
 
 
 
 
 
1015
 
1016
  NAMESPACE_TABLE_MAP = {
1017
  NameSpace.KV_STORE_FULL_DOCS: "LIGHTRAG_DOC_FULL",
@@ -1194,4 +1222,27 @@ SQL_TEMPLATES = {
1194
  FROM LIGHTRAG_DOC_CHUNKS where workspace=$1)
1195
  WHERE distance>$2 ORDER BY distance DESC LIMIT $3
1196
  """,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1197
  }
 
263
  exist_keys = [key["id"] for key in res]
264
  else:
265
  exist_keys = []
266
+ new_keys = set([s for s in keys if s not in exist_keys])
267
+ return new_keys
268
  except Exception as e:
269
  logger.error(f"PostgreSQL database error: {e}")
270
  print(sql)
 
301
  # PG handles persistence automatically
302
  pass
303
 
304
+ async def drop(self) -> None:
305
+ """Drop the storage"""
306
+ drop_sql = SQL_TEMPLATES["drop_all"]
307
+ await self.db.execute(drop_sql)
308
+
309
 
310
  @final
311
  @dataclass
 
437
  @dataclass
438
  class PGDocStatusStorage(DocStatusStorage):
439
  async def filter_keys(self, keys: set[str]) -> set[str]:
440
+ """Filter out duplicated content"""
441
+ sql = SQL_TEMPLATES["filter_keys"].format(
442
+ table_name=namespace_to_table_name(self.namespace),
443
+ ids=",".join([f"'{id}'" for id in keys]),
444
+ )
445
+ params = {"workspace": self.db.workspace}
446
+ try:
447
+ res = await self.db.query(sql, params, multirows=True)
448
+ if res:
449
+ exist_keys = [key["id"] for key in res]
450
+ else:
451
+ exist_keys = []
452
+ new_keys = set([s for s in keys if s not in exist_keys])
453
+ print(f"keys: {keys}")
454
+ print(f"new_keys: {new_keys}")
455
+ return new_keys
456
+ except Exception as e:
457
+ logger.error(f"PostgreSQL database error: {e}")
458
+ print(sql)
459
+ print(params)
460
 
461
  async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
462
  sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and id=$2"
 
498
  sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
499
  params = {"workspace": self.db.workspace, "status": status.value}
500
  result = await self.db.query(sql, params, True)
501
+ docs_by_status = {
502
  element["id"]: DocProcessingStatus(
503
  content=result[0]["content"],
504
  content_summary=element["content_summary"],
 
510
  )
511
  for element in result
512
  }
513
+ return docs_by_status
514
 
515
  async def index_done_callback(self) -> None:
516
  # PG handles persistence automatically
 
547
  )
548
  return data
549
 
550
+ async def drop(self) -> None:
551
+ """Drop the storage"""
552
+ drop_sql = SQL_TEMPLATES["drop_doc_full"]
553
+ await self.db.execute(drop_sql)
554
+
555
 
556
  class PGGraphQueryException(Exception):
557
  """Exception for the AGE queries."""
 
1033
  ) -> KnowledgeGraph:
1034
  raise NotImplementedError
1035
 
1036
+ async def drop(self) -> None:
1037
+ """Drop the storage"""
1038
+ drop_sql = SQL_TEMPLATES["drop_vdb_entity"]
1039
+ await self.db.execute(drop_sql)
1040
+ drop_sql = SQL_TEMPLATES["drop_vdb_relation"]
1041
+ await self.db.execute(drop_sql)
1042
+
1043
 
1044
  NAMESPACE_TABLE_MAP = {
1045
  NameSpace.KV_STORE_FULL_DOCS: "LIGHTRAG_DOC_FULL",
 
1222
  FROM LIGHTRAG_DOC_CHUNKS where workspace=$1)
1223
  WHERE distance>$2 ORDER BY distance DESC LIMIT $3
1224
  """,
1225
+ # DROP tables
1226
+ "drop_all": """
1227
+ DROP TABLE IF EXISTS LIGHTRAG_DOC_FULL CASCADE;
1228
+ DROP TABLE IF EXISTS LIGHTRAG_DOC_CHUNKS CASCADE;
1229
+ DROP TABLE IF EXISTS LIGHTRAG_LLM_CACHE CASCADE;
1230
+ DROP TABLE IF EXISTS LIGHTRAG_VDB_ENTITY CASCADE;
1231
+ DROP TABLE IF EXISTS LIGHTRAG_VDB_RELATION CASCADE;
1232
+ """,
1233
+ "drop_doc_full": """
1234
+ DROP TABLE IF EXISTS LIGHTRAG_DOC_FULL CASCADE;
1235
+ """,
1236
+ "drop_doc_chunks": """
1237
+ DROP TABLE IF EXISTS LIGHTRAG_DOC_CHUNKS CASCADE;
1238
+ """,
1239
+ "drop_llm_cache": """
1240
+ DROP TABLE IF EXISTS LIGHTRAG_LLM_CACHE CASCADE;
1241
+ """,
1242
+ "drop_vdb_entity": """
1243
+ DROP TABLE IF EXISTS LIGHTRAG_VDB_ENTITY CASCADE;
1244
+ """,
1245
+ "drop_vdb_relation": """
1246
+ DROP TABLE IF EXISTS LIGHTRAG_VDB_RELATION CASCADE;
1247
+ """,
1248
  }