zrguo commited on
Commit
c0af224
·
unverified ·
2 Parent(s): 7607282 7471fe1

Merge pull request #581 from jiabin2wang20230918/fix_mongo

Browse files

Fix for mongoDB document 16MB limit and add os env NEO4J_MAX_CONNECTION_POOL_SIZE for neo4j

lightrag/kg/mongo_impl.py CHANGED
@@ -2,7 +2,7 @@ import os
2
  from tqdm.asyncio import tqdm as tqdm_async
3
  from dataclasses import dataclass
4
  from pymongo import MongoClient
5
-
6
  from lightrag.utils import logger
7
 
8
  from lightrag.base import BaseKVStorage
@@ -41,11 +41,35 @@ class MongoKVStorage(BaseKVStorage):
41
  return set([s for s in data if s not in existing_ids])
42
 
43
  async def upsert(self, data: dict[str, dict]):
44
- for k, v in tqdm_async(data.items(), desc="Upserting"):
45
- self._data.update_one({"_id": k}, {"$set": v}, upsert=True)
46
- data[k]["_id"] = k
 
 
 
 
 
 
 
 
 
 
 
47
  return data
48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  async def drop(self):
50
  """ """
51
  pass
 
2
  from tqdm.asyncio import tqdm as tqdm_async
3
  from dataclasses import dataclass
4
  from pymongo import MongoClient
5
+ from typing import Union
6
  from lightrag.utils import logger
7
 
8
  from lightrag.base import BaseKVStorage
 
41
  return set([s for s in data if s not in existing_ids])
42
 
43
  async def upsert(self, data: dict[str, dict]):
44
+ if self.namespace == "llm_response_cache":
45
+ for mode, items in data.items():
46
+ for k, v in tqdm_async(items.items(), desc="Upserting"):
47
+ key = f"{mode}_{k}"
48
+ result = self._data.update_one(
49
+ {"_id": key}, {"$setOnInsert": v}, upsert=True
50
+ )
51
+ if result.upserted_id:
52
+ logger.debug(f"\nInserted new document with key: {key}")
53
+ data[mode][k]["_id"] = key
54
+ else:
55
+ for k, v in tqdm_async(data.items(), desc="Upserting"):
56
+ self._data.update_one({"_id": k}, {"$set": v}, upsert=True)
57
+ data[k]["_id"] = k
58
  return data
59
 
60
+ async def get_by_mode_and_id(self, mode: str, id: str) -> Union[dict, None]:
61
+ if "llm_response_cache" == self.namespace:
62
+ res = {}
63
+ v = self._data.find_one({"_id": mode + "_" + id})
64
+ if v:
65
+ res[id] = v
66
+ logger.debug(f"llm_response_cache find one by:{id}")
67
+ return res
68
+ else:
69
+ return None
70
+ else:
71
+ return None
72
+
73
  async def drop(self):
74
  """ """
75
  pass
lightrag/kg/neo4j_impl.py CHANGED
@@ -39,6 +39,7 @@ class Neo4JStorage(BaseGraphStorage):
39
  URI = os.environ["NEO4J_URI"]
40
  USERNAME = os.environ["NEO4J_USERNAME"]
41
  PASSWORD = os.environ["NEO4J_PASSWORD"]
 
42
  DATABASE = os.environ.get(
43
  "NEO4J_DATABASE"
44
  ) # If this param is None, the home database will be used. If it is not None, the specified database will be used.
@@ -47,7 +48,11 @@ class Neo4JStorage(BaseGraphStorage):
47
  URI, auth=(USERNAME, PASSWORD)
48
  )
49
  _database_name = "home database" if DATABASE is None else f"database {DATABASE}"
50
- with GraphDatabase.driver(URI, auth=(USERNAME, PASSWORD)) as _sync_driver:
 
 
 
 
51
  try:
52
  with _sync_driver.session(database=DATABASE) as session:
53
  try:
 
39
  URI = os.environ["NEO4J_URI"]
40
  USERNAME = os.environ["NEO4J_USERNAME"]
41
  PASSWORD = os.environ["NEO4J_PASSWORD"]
42
+ MAX_CONNECTION_POOL_SIZE = os.environ.get("NEO4J_MAX_CONNECTION_POOL_SIZE", 800)
43
  DATABASE = os.environ.get(
44
  "NEO4J_DATABASE"
45
  ) # If this param is None, the home database will be used. If it is not None, the specified database will be used.
 
48
  URI, auth=(USERNAME, PASSWORD)
49
  )
50
  _database_name = "home database" if DATABASE is None else f"database {DATABASE}"
51
+ with GraphDatabase.driver(
52
+ URI,
53
+ auth=(USERNAME, PASSWORD),
54
+ max_connection_pool_size=MAX_CONNECTION_POOL_SIZE,
55
+ ) as _sync_driver:
56
  try:
57
  with _sync_driver.session(database=DATABASE) as session:
58
  try: