zrguo commited on
Commit
337e371
·
unverified ·
2 Parent(s): 735d7be 054e9d1

Merge pull request #589 from ShanGor/main

Browse files

Enrich README.md for postgres usage, fix the issue for running postgres on python version<12

README.md CHANGED
@@ -361,6 +361,7 @@ see test_neo4j.py for a working example.
361
  ### Using PostgreSQL for Storage
362
  For production level scenarios you will most likely want to leverage an enterprise solution. PostgreSQL can provide a one-stop solution for you as KV store, VectorDB (pgvector) and GraphDB (apache AGE).
363
  * PostgreSQL is lightweight,the whole binary distribution including all necessary plugins can be zipped to 40MB: Ref to [Windows Release](https://github.com/ShanGor/apache-age-windows/releases/tag/PG17%2Fv1.5.0-rc0) as it is easy to install for Linux/Mac.
 
364
  * How to start? Ref to: [examples/lightrag_zhipu_postgres_demo.py](https://github.com/HKUDS/LightRAG/blob/main/examples/lightrag_zhipu_postgres_demo.py)
365
  * Create index for AGE example: (Change below `dickens` to your graph name if necessary)
366
  ```
 
361
  ### Using PostgreSQL for Storage
362
  For production level scenarios you will most likely want to leverage an enterprise solution. PostgreSQL can provide a one-stop solution for you as KV store, VectorDB (pgvector) and GraphDB (apache AGE).
363
  * PostgreSQL is lightweight,the whole binary distribution including all necessary plugins can be zipped to 40MB: Ref to [Windows Release](https://github.com/ShanGor/apache-age-windows/releases/tag/PG17%2Fv1.5.0-rc0) as it is easy to install for Linux/Mac.
364
+ * If you prefer docker, please start with this image if you are a beginner to avoid hiccups (DO read the overview): https://hub.docker.com/r/shangor/postgres-for-rag
365
  * How to start? Ref to: [examples/lightrag_zhipu_postgres_demo.py](https://github.com/HKUDS/LightRAG/blob/main/examples/lightrag_zhipu_postgres_demo.py)
366
  * Create index for AGE example: (Change below `dickens` to your graph name if necessary)
367
  ```
examples/copy_llm_cache_to_another_storage.py ADDED
@@ -0,0 +1,97 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Sometimes you need to switch a storage solution, but you want to save LLM token and time.
3
+ This handy script helps you to copy the LLM caches from one storage solution to another.
4
+ (Not all the storage impl are supported)
5
+ """
6
+
7
+ import asyncio
8
+ import logging
9
+ import os
10
+ from dotenv import load_dotenv
11
+
12
+ from lightrag.kg.postgres_impl import PostgreSQLDB, PGKVStorage
13
+ from lightrag.storage import JsonKVStorage
14
+
15
+ load_dotenv()
16
+ ROOT_DIR = os.environ.get("ROOT_DIR")
17
+ WORKING_DIR = f"{ROOT_DIR}/dickens"
18
+
19
+ logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
20
+
21
+ if not os.path.exists(WORKING_DIR):
22
+ os.mkdir(WORKING_DIR)
23
+
24
+ # AGE
25
+ os.environ["AGE_GRAPH_NAME"] = "chinese"
26
+
27
+ postgres_db = PostgreSQLDB(
28
+ config={
29
+ "host": "localhost",
30
+ "port": 15432,
31
+ "user": "rag",
32
+ "password": "rag",
33
+ "database": "r2",
34
+ }
35
+ )
36
+
37
+
38
+ async def copy_from_postgres_to_json():
39
+ await postgres_db.initdb()
40
+
41
+ from_llm_response_cache = PGKVStorage(
42
+ namespace="llm_response_cache",
43
+ global_config={"embedding_batch_num": 6},
44
+ embedding_func=None,
45
+ db=postgres_db,
46
+ )
47
+
48
+ to_llm_response_cache = JsonKVStorage(
49
+ namespace="llm_response_cache",
50
+ global_config={"working_dir": WORKING_DIR},
51
+ embedding_func=None,
52
+ )
53
+
54
+ kv = {}
55
+ for c_id in await from_llm_response_cache.all_keys():
56
+ print(f"Copying {c_id}")
57
+ workspace = c_id["workspace"]
58
+ mode = c_id["mode"]
59
+ _id = c_id["id"]
60
+ postgres_db.workspace = workspace
61
+ obj = await from_llm_response_cache.get_by_mode_and_id(mode, _id)
62
+ if mode not in kv:
63
+ kv[mode] = {}
64
+ kv[mode][_id] = obj[_id]
65
+ print(f"Object {obj}")
66
+ await to_llm_response_cache.upsert(kv)
67
+ await to_llm_response_cache.index_done_callback()
68
+ print("Mission accomplished!")
69
+
70
+
71
+ async def copy_from_json_to_postgres():
72
+ await postgres_db.initdb()
73
+
74
+ from_llm_response_cache = JsonKVStorage(
75
+ namespace="llm_response_cache",
76
+ global_config={"working_dir": WORKING_DIR},
77
+ embedding_func=None,
78
+ )
79
+
80
+ to_llm_response_cache = PGKVStorage(
81
+ namespace="llm_response_cache",
82
+ global_config={"embedding_batch_num": 6},
83
+ embedding_func=None,
84
+ db=postgres_db,
85
+ )
86
+
87
+ for mode in await from_llm_response_cache.all_keys():
88
+ print(f"Copying {mode}")
89
+ caches = await from_llm_response_cache.get_by_id(mode)
90
+ for k, v in caches.items():
91
+ item = {mode: {k: v}}
92
+ print(f"\tCopying {item}")
93
+ await to_llm_response_cache.upsert(item)
94
+
95
+
96
+ if __name__ == "__main__":
97
+ asyncio.run(copy_from_json_to_postgres())
lightrag/kg/postgres_impl.py CHANGED
@@ -231,6 +231,16 @@ class PGKVStorage(BaseKVStorage):
231
  else:
232
  return None
233
 
 
 
 
 
 
 
 
 
 
 
234
  async def filter_keys(self, keys: List[str]) -> Set[str]:
235
  """Filter out duplicated content"""
236
  sql = SQL_TEMPLATES["filter_keys"].format(
@@ -412,7 +422,10 @@ class PGDocStatusStorage(DocStatusStorage):
412
 
413
  async def filter_keys(self, data: list[str]) -> set[str]:
414
  """Return keys that don't exist in storage"""
415
- sql = f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace=$1 AND id IN ({",".join([f"'{_id}'" for _id in data])})"
 
 
 
416
  result = await self.db.query(sql, {"workspace": self.db.workspace}, True)
417
  # The result is like [{'id': 'id1'}, {'id': 'id2'}, ...].
418
  if result is None:
 
231
  else:
232
  return None
233
 
234
+ async def all_keys(self) -> list[dict]:
235
+ if "llm_response_cache" == self.namespace:
236
+ sql = "select workspace,mode,id from lightrag_llm_cache"
237
+ res = await self.db.query(sql, multirows=True)
238
+ return res
239
+ else:
240
+ logger.error(
241
+ f"all_keys is only implemented for llm_response_cache, not for {self.namespace}"
242
+ )
243
+
244
  async def filter_keys(self, keys: List[str]) -> Set[str]:
245
  """Filter out duplicated content"""
246
  sql = SQL_TEMPLATES["filter_keys"].format(
 
422
 
423
  async def filter_keys(self, data: list[str]) -> set[str]:
424
  """Return keys that don't exist in storage"""
425
+ keys = ",".join([f"'{_id}'" for _id in data])
426
+ sql = (
427
+ f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace=$1 AND id IN ({keys})"
428
+ )
429
  result = await self.db.query(sql, {"workspace": self.db.workspace}, True)
430
  # The result is like [{'id': 'id1'}, {'id': 'id2'}, ...].
431
  if result is None: