yangdx commited on
Commit
6fe9628
·
1 Parent(s): b123c9f

Fix linting

Browse files
Files changed (2) hide show
  1. lightrag/kg/redis_impl.py +17 -11
  2. lightrag/operate.py +2 -2
lightrag/kg/redis_impl.py CHANGED
@@ -9,8 +9,8 @@ if not pm.is_installed("redis"):
9
  pm.install("redis")
10
 
11
  # aioredis is a depricated library, replaced with redis
12
- from redis.asyncio import Redis, ConnectionPool # type: ignore
13
- from redis.exceptions import RedisError, ConnectionError # type: ignore
14
  from lightrag.utils import logger
15
 
16
  from lightrag.base import BaseKVStorage
@@ -39,10 +39,12 @@ class RedisKVStorage(BaseKVStorage):
39
  max_connections=MAX_CONNECTIONS,
40
  decode_responses=True,
41
  socket_timeout=SOCKET_TIMEOUT,
42
- socket_connect_timeout=SOCKET_CONNECT_TIMEOUT
43
  )
44
  self._redis = Redis(connection_pool=self._pool)
45
- logger.info(f"Initialized Redis connection pool for {self.namespace} with max {MAX_CONNECTIONS} connections")
 
 
46
 
47
  @asynccontextmanager
48
  async def _get_redis_connection(self):
@@ -56,12 +58,14 @@ class RedisKVStorage(BaseKVStorage):
56
  logger.error(f"Redis operation error in {self.namespace}: {e}")
57
  raise
58
  except Exception as e:
59
- logger.error(f"Unexpected error in Redis operation for {self.namespace}: {e}")
 
 
60
  raise
61
 
62
  async def close(self):
63
  """Close the Redis connection pool to prevent resource leaks."""
64
- if hasattr(self, '_redis') and self._redis:
65
  await self._redis.close()
66
  await self._pool.disconnect()
67
  logger.debug(f"Closed Redis connection pool for {self.namespace}")
@@ -108,7 +112,7 @@ class RedisKVStorage(BaseKVStorage):
108
  async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
109
  if not data:
110
  return
111
-
112
  logger.info(f"Inserting {len(data)} items to {self.namespace}")
113
  async with self._get_redis_connection() as redis:
114
  try:
@@ -122,11 +126,11 @@ class RedisKVStorage(BaseKVStorage):
122
  except json.JSONEncodeError as e:
123
  logger.error(f"JSON encode error during upsert: {e}")
124
  raise
125
-
126
  async def index_done_callback(self) -> None:
127
  # Redis handles persistence automatically
128
  pass
129
-
130
  async def delete(self, ids: list[str]) -> None:
131
  """Delete entries with specified IDs"""
132
  if not ids:
@@ -183,7 +187,10 @@ class RedisKVStorage(BaseKVStorage):
183
  deleted_count = sum(results)
184
 
185
  logger.info(f"Dropped {deleted_count} keys from {self.namespace}")
186
- return {"status": "success", "message": f"{deleted_count} keys dropped"}
 
 
 
187
  else:
188
  logger.info(f"No keys found to drop in {self.namespace}")
189
  return {"status": "success", "message": "no keys to drop"}
@@ -191,4 +198,3 @@ class RedisKVStorage(BaseKVStorage):
191
  except Exception as e:
192
  logger.error(f"Error dropping keys from {self.namespace}: {e}")
193
  return {"status": "error", "message": str(e)}
194
-
 
9
  pm.install("redis")
10
 
11
  # aioredis is a depricated library, replaced with redis
12
+ from redis.asyncio import Redis, ConnectionPool # type: ignore
13
+ from redis.exceptions import RedisError, ConnectionError # type: ignore
14
  from lightrag.utils import logger
15
 
16
  from lightrag.base import BaseKVStorage
 
39
  max_connections=MAX_CONNECTIONS,
40
  decode_responses=True,
41
  socket_timeout=SOCKET_TIMEOUT,
42
+ socket_connect_timeout=SOCKET_CONNECT_TIMEOUT,
43
  )
44
  self._redis = Redis(connection_pool=self._pool)
45
+ logger.info(
46
+ f"Initialized Redis connection pool for {self.namespace} with max {MAX_CONNECTIONS} connections"
47
+ )
48
 
49
  @asynccontextmanager
50
  async def _get_redis_connection(self):
 
58
  logger.error(f"Redis operation error in {self.namespace}: {e}")
59
  raise
60
  except Exception as e:
61
+ logger.error(
62
+ f"Unexpected error in Redis operation for {self.namespace}: {e}"
63
+ )
64
  raise
65
 
66
  async def close(self):
67
  """Close the Redis connection pool to prevent resource leaks."""
68
+ if hasattr(self, "_redis") and self._redis:
69
  await self._redis.close()
70
  await self._pool.disconnect()
71
  logger.debug(f"Closed Redis connection pool for {self.namespace}")
 
112
  async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
113
  if not data:
114
  return
115
+
116
  logger.info(f"Inserting {len(data)} items to {self.namespace}")
117
  async with self._get_redis_connection() as redis:
118
  try:
 
126
  except json.JSONEncodeError as e:
127
  logger.error(f"JSON encode error during upsert: {e}")
128
  raise
129
+
130
  async def index_done_callback(self) -> None:
131
  # Redis handles persistence automatically
132
  pass
133
+
134
  async def delete(self, ids: list[str]) -> None:
135
  """Delete entries with specified IDs"""
136
  if not ids:
 
187
  deleted_count = sum(results)
188
 
189
  logger.info(f"Dropped {deleted_count} keys from {self.namespace}")
190
+ return {
191
+ "status": "success",
192
+ "message": f"{deleted_count} keys dropped",
193
+ }
194
  else:
195
  logger.info(f"No keys found to drop in {self.namespace}")
196
  return {"status": "success", "message": "no keys to drop"}
 
198
  except Exception as e:
199
  logger.error(f"Error dropping keys from {self.namespace}: {e}")
200
  return {"status": "error", "message": str(e)}
 
lightrag/operate.py CHANGED
@@ -1397,9 +1397,9 @@ async def _find_most_related_text_unit_from_entities(
1397
  # Process in batches of 25 tasks at a time to avoid overwhelming resources
1398
  batch_size = 5
1399
  results = []
1400
-
1401
  for i in range(0, len(tasks), batch_size):
1402
- batch_tasks = tasks[i:i + batch_size]
1403
  batch_results = await asyncio.gather(
1404
  *[text_chunks_db.get_by_id(c_id) for c_id, _, _ in batch_tasks]
1405
  )
 
1397
  # Process in batches of 25 tasks at a time to avoid overwhelming resources
1398
  batch_size = 5
1399
  results = []
1400
+
1401
  for i in range(0, len(tasks), batch_size):
1402
+ batch_tasks = tasks[i : i + batch_size]
1403
  batch_results = await asyncio.gather(
1404
  *[text_chunks_db.get_by_id(c_id) for c_id, _, _ in batch_tasks]
1405
  )