Daniel.y commited on
Commit
cd12e9c
·
unverified ·
2 Parent(s): 9ca235a 9535cef

Merge pull request #1732 from danielaskdd/optimize-doc-delete

Browse files
env.example CHANGED
@@ -114,15 +114,6 @@ EMBEDDING_BINDING_HOST=http://localhost:11434
114
  # LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage
115
  # LIGHTRAG_GRAPH_STORAGE=Neo4JStorage
116
 
117
- ### TiDB Configuration (Deprecated)
118
- # TIDB_HOST=localhost
119
- # TIDB_PORT=4000
120
- # TIDB_USER=your_username
121
- # TIDB_PASSWORD='your_password'
122
- # TIDB_DATABASE=your_database
123
- ### separating all data from difference Lightrag instances(deprecating)
124
- # TIDB_WORKSPACE=default
125
-
126
  ### PostgreSQL Configuration
127
  POSTGRES_HOST=localhost
128
  POSTGRES_PORT=5432
@@ -130,7 +121,7 @@ POSTGRES_USER=your_username
130
  POSTGRES_PASSWORD='your_password'
131
  POSTGRES_DATABASE=your_database
132
  POSTGRES_MAX_CONNECTIONS=12
133
- ### separating all data from difference Lightrag instances(deprecating)
134
  # POSTGRES_WORKSPACE=default
135
 
136
  ### Neo4j Configuration
@@ -146,14 +137,15 @@ NEO4J_PASSWORD='your_password'
146
  # AGE_POSTGRES_PORT=8529
147
 
148
  # AGE Graph Name(apply to PostgreSQL and independent AGM)
149
- ### AGE_GRAPH_NAME is precated
150
  # AGE_GRAPH_NAME=lightrag
151
 
152
  ### MongoDB Configuration
153
  MONGO_URI=mongodb://root:root@localhost:27017/
154
  MONGO_DATABASE=LightRAG
155
  ### separating all data from difference Lightrag instances(deprecating)
156
- # MONGODB_GRAPH=false
 
157
 
158
  ### Milvus Configuration
159
  MILVUS_URI=http://localhost:19530
 
114
  # LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage
115
  # LIGHTRAG_GRAPH_STORAGE=Neo4JStorage
116
 
 
 
 
 
 
 
 
 
 
117
  ### PostgreSQL Configuration
118
  POSTGRES_HOST=localhost
119
  POSTGRES_PORT=5432
 
121
  POSTGRES_PASSWORD='your_password'
122
  POSTGRES_DATABASE=your_database
123
  POSTGRES_MAX_CONNECTIONS=12
124
+ ### separating all data from difference Lightrag instances
125
  # POSTGRES_WORKSPACE=default
126
 
127
  ### Neo4j Configuration
 
137
  # AGE_POSTGRES_PORT=8529
138
 
139
  # AGE Graph Name(apply to PostgreSQL and independent AGM)
140
+ ### AGE_GRAPH_NAME is deprecated
141
  # AGE_GRAPH_NAME=lightrag
142
 
143
  ### MongoDB Configuration
144
  MONGO_URI=mongodb://root:root@localhost:27017/
145
  MONGO_DATABASE=LightRAG
146
  ### separating all data from difference Lightrag instances(deprecating)
147
+ ### separating all data from difference Lightrag instances
148
+ # MONGODB_WORKSPACE=default
149
 
150
  ### Milvus Configuration
151
  MILVUS_URI=http://localhost:19530
lightrag/base.py CHANGED
@@ -634,6 +634,8 @@ class DocProcessingStatus:
634
  """ISO format timestamp when document was last updated"""
635
  chunks_count: int | None = None
636
  """Number of chunks after splitting, used for processing"""
 
 
637
  error: str | None = None
638
  """Error message if failed"""
639
  metadata: dict[str, Any] = field(default_factory=dict)
 
634
  """ISO format timestamp when document was last updated"""
635
  chunks_count: int | None = None
636
  """Number of chunks after splitting, used for processing"""
637
+ chunks_list: list[str] | None = field(default_factory=list)
638
+ """List of chunk IDs associated with this document, used for deletion"""
639
  error: str | None = None
640
  """Error message if failed"""
641
  metadata: dict[str, Any] = field(default_factory=dict)
lightrag/kg/json_doc_status_impl.py CHANGED
@@ -118,6 +118,10 @@ class JsonDocStatusStorage(DocStatusStorage):
118
  return
119
  logger.debug(f"Inserting {len(data)} records to {self.namespace}")
120
  async with self._storage_lock:
 
 
 
 
121
  self._data.update(data)
122
  await set_all_update_flags(self.namespace)
123
 
 
118
  return
119
  logger.debug(f"Inserting {len(data)} records to {self.namespace}")
120
  async with self._storage_lock:
121
+ # Ensure chunks_list field exists for new documents
122
+ for doc_id, doc_data in data.items():
123
+ if "chunks_list" not in doc_data:
124
+ doc_data["chunks_list"] = []
125
  self._data.update(data)
126
  await set_all_update_flags(self.namespace)
127
 
lightrag/kg/json_kv_impl.py CHANGED
@@ -78,22 +78,49 @@ class JsonKVStorage(BaseKVStorage):
78
  Dictionary containing all stored data
79
  """
80
  async with self._storage_lock:
81
- return dict(self._data)
 
 
 
 
 
 
 
 
 
 
 
82
 
83
  async def get_by_id(self, id: str) -> dict[str, Any] | None:
84
  async with self._storage_lock:
85
- return self._data.get(id)
 
 
 
 
 
 
 
 
 
86
 
87
  async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
88
  async with self._storage_lock:
89
- return [
90
- (
91
- {k: v for k, v in self._data[id].items()}
92
- if self._data.get(id, None)
93
- else None
94
- )
95
- for id in ids
96
- ]
 
 
 
 
 
 
 
97
 
98
  async def filter_keys(self, keys: set[str]) -> set[str]:
99
  async with self._storage_lock:
@@ -107,8 +134,29 @@ class JsonKVStorage(BaseKVStorage):
107
  """
108
  if not data:
109
  return
 
 
 
 
 
110
  logger.debug(f"Inserting {len(data)} records to {self.namespace}")
111
  async with self._storage_lock:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
112
  self._data.update(data)
113
  await set_all_update_flags(self.namespace)
114
 
 
78
  Dictionary containing all stored data
79
  """
80
  async with self._storage_lock:
81
+ result = {}
82
+ for key, value in self._data.items():
83
+ if value:
84
+ # Create a copy to avoid modifying the original data
85
+ data = dict(value)
86
+ # Ensure time fields are present, provide default values for old data
87
+ data.setdefault("create_time", 0)
88
+ data.setdefault("update_time", 0)
89
+ result[key] = data
90
+ else:
91
+ result[key] = value
92
+ return result
93
 
94
  async def get_by_id(self, id: str) -> dict[str, Any] | None:
95
  async with self._storage_lock:
96
+ result = self._data.get(id)
97
+ if result:
98
+ # Create a copy to avoid modifying the original data
99
+ result = dict(result)
100
+ # Ensure time fields are present, provide default values for old data
101
+ result.setdefault("create_time", 0)
102
+ result.setdefault("update_time", 0)
103
+ # Ensure _id field contains the clean ID
104
+ result["_id"] = id
105
+ return result
106
 
107
  async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
108
  async with self._storage_lock:
109
+ results = []
110
+ for id in ids:
111
+ data = self._data.get(id, None)
112
+ if data:
113
+ # Create a copy to avoid modifying the original data
114
+ result = {k: v for k, v in data.items()}
115
+ # Ensure time fields are present, provide default values for old data
116
+ result.setdefault("create_time", 0)
117
+ result.setdefault("update_time", 0)
118
+ # Ensure _id field contains the clean ID
119
+ result["_id"] = id
120
+ results.append(result)
121
+ else:
122
+ results.append(None)
123
+ return results
124
 
125
  async def filter_keys(self, keys: set[str]) -> set[str]:
126
  async with self._storage_lock:
 
134
  """
135
  if not data:
136
  return
137
+
138
+ import time
139
+
140
+ current_time = int(time.time()) # Get current Unix timestamp
141
+
142
  logger.debug(f"Inserting {len(data)} records to {self.namespace}")
143
  async with self._storage_lock:
144
+ # Add timestamps to data based on whether key exists
145
+ for k, v in data.items():
146
+ # For text_chunks namespace, ensure llm_cache_list field exists
147
+ if "text_chunks" in self.namespace:
148
+ if "llm_cache_list" not in v:
149
+ v["llm_cache_list"] = []
150
+
151
+ # Add timestamps based on whether key exists
152
+ if k in self._data: # Key exists, only update update_time
153
+ v["update_time"] = current_time
154
+ else: # New key, set both create_time and update_time
155
+ v["create_time"] = current_time
156
+ v["update_time"] = current_time
157
+
158
+ v["_id"] = k
159
+
160
  self._data.update(data)
161
  await set_all_update_flags(self.namespace)
162
 
lightrag/kg/mongo_impl.py CHANGED
@@ -98,11 +98,21 @@ class MongoKVStorage(BaseKVStorage):
98
 
99
  async def get_by_id(self, id: str) -> dict[str, Any] | None:
100
  # Unified handling for flattened keys
101
- return await self._data.find_one({"_id": id})
 
 
 
 
 
102
 
103
  async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
104
  cursor = self._data.find({"_id": {"$in": ids}})
105
- return await cursor.to_list()
 
 
 
 
 
106
 
107
  async def filter_keys(self, keys: set[str]) -> set[str]:
108
  cursor = self._data.find({"_id": {"$in": list(keys)}}, {"_id": 1})
@@ -119,6 +129,9 @@ class MongoKVStorage(BaseKVStorage):
119
  result = {}
120
  async for doc in cursor:
121
  doc_id = doc.pop("_id")
 
 
 
122
  result[doc_id] = doc
123
  return result
124
 
@@ -132,9 +145,29 @@ class MongoKVStorage(BaseKVStorage):
132
  from pymongo import UpdateOne
133
 
134
  operations = []
 
 
135
  for k, v in data.items():
 
 
 
 
 
136
  v["_id"] = k # Use flattened key as _id
137
- operations.append(UpdateOne({"_id": k}, {"$set": v}, upsert=True))
 
 
 
 
 
 
 
 
 
 
 
 
 
138
 
139
  if operations:
140
  await self._data.bulk_write(operations)
@@ -247,6 +280,9 @@ class MongoDocStatusStorage(DocStatusStorage):
247
  return
248
  update_tasks: list[Any] = []
249
  for k, v in data.items():
 
 
 
250
  data[k]["_id"] = k
251
  update_tasks.append(
252
  self._data.update_one({"_id": k}, {"$set": v}, upsert=True)
@@ -279,6 +315,7 @@ class MongoDocStatusStorage(DocStatusStorage):
279
  updated_at=doc.get("updated_at"),
280
  chunks_count=doc.get("chunks_count", -1),
281
  file_path=doc.get("file_path", doc["_id"]),
 
282
  )
283
  for doc in result
284
  }
 
98
 
99
  async def get_by_id(self, id: str) -> dict[str, Any] | None:
100
  # Unified handling for flattened keys
101
+ doc = await self._data.find_one({"_id": id})
102
+ if doc:
103
+ # Ensure time fields are present, provide default values for old data
104
+ doc.setdefault("create_time", 0)
105
+ doc.setdefault("update_time", 0)
106
+ return doc
107
 
108
  async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
109
  cursor = self._data.find({"_id": {"$in": ids}})
110
+ docs = await cursor.to_list()
111
+ # Ensure time fields are present for all documents
112
+ for doc in docs:
113
+ doc.setdefault("create_time", 0)
114
+ doc.setdefault("update_time", 0)
115
+ return docs
116
 
117
  async def filter_keys(self, keys: set[str]) -> set[str]:
118
  cursor = self._data.find({"_id": {"$in": list(keys)}}, {"_id": 1})
 
129
  result = {}
130
  async for doc in cursor:
131
  doc_id = doc.pop("_id")
132
+ # Ensure time fields are present for all documents
133
+ doc.setdefault("create_time", 0)
134
+ doc.setdefault("update_time", 0)
135
  result[doc_id] = doc
136
  return result
137
 
 
145
  from pymongo import UpdateOne
146
 
147
  operations = []
148
+ current_time = int(time.time()) # Get current Unix timestamp
149
+
150
  for k, v in data.items():
151
+ # For text_chunks namespace, ensure llm_cache_list field exists
152
+ if self.namespace.endswith("text_chunks"):
153
+ if "llm_cache_list" not in v:
154
+ v["llm_cache_list"] = []
155
+
156
  v["_id"] = k # Use flattened key as _id
157
+ v["update_time"] = current_time # Always update update_time
158
+
159
+ operations.append(
160
+ UpdateOne(
161
+ {"_id": k},
162
+ {
163
+ "$set": v, # Update all fields including update_time
164
+ "$setOnInsert": {
165
+ "create_time": current_time
166
+ }, # Set create_time only on insert
167
+ },
168
+ upsert=True,
169
+ )
170
+ )
171
 
172
  if operations:
173
  await self._data.bulk_write(operations)
 
280
  return
281
  update_tasks: list[Any] = []
282
  for k, v in data.items():
283
+ # Ensure chunks_list field exists and is an array
284
+ if "chunks_list" not in v:
285
+ v["chunks_list"] = []
286
  data[k]["_id"] = k
287
  update_tasks.append(
288
  self._data.update_one({"_id": k}, {"$set": v}, upsert=True)
 
315
  updated_at=doc.get("updated_at"),
316
  chunks_count=doc.get("chunks_count", -1),
317
  file_path=doc.get("file_path", doc["_id"]),
318
+ chunks_list=doc.get("chunks_list", []),
319
  )
320
  for doc in result
321
  }
lightrag/kg/postgres_impl.py CHANGED
@@ -136,6 +136,52 @@ class PostgreSQLDB:
136
  except Exception as e:
137
  logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}")
138
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
139
  async def _migrate_timestamp_columns(self):
140
  """Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC time"""
141
  # Tables and columns that need migration
@@ -301,15 +347,17 @@ class PostgreSQLDB:
301
  record["mode"], record["original_prompt"]
302
  )
303
 
 
 
 
304
  # Generate new flattened key
305
- cache_type = "extract" # Default type
306
  new_key = f"{record['mode']}:{cache_type}:{new_hash}"
307
 
308
- # Insert new format data
309
  insert_sql = """
310
  INSERT INTO LIGHTRAG_LLM_CACHE
311
- (workspace, id, mode, original_prompt, return_value, chunk_id, create_time, update_time)
312
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
313
  ON CONFLICT (workspace, mode, id) DO NOTHING
314
  """
315
 
@@ -322,6 +370,7 @@ class PostgreSQLDB:
322
  "original_prompt": record["original_prompt"],
323
  "return_value": record["return_value"],
324
  "chunk_id": record["chunk_id"],
 
325
  "create_time": record["create_time"],
326
  "update_time": record["update_time"],
327
  },
@@ -357,6 +406,68 @@ class PostgreSQLDB:
357
  logger.error(f"LLM cache migration failed: {e}")
358
  # Don't raise exception, allow system to continue startup
359
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
360
  async def check_tables(self):
361
  # First create all tables
362
  for k, v in TABLES.items():
@@ -408,6 +519,15 @@ class PostgreSQLDB:
408
  logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}")
409
  # Don't throw an exception, allow the initialization process to continue
410
 
 
 
 
 
 
 
 
 
 
411
  # Finally, attempt to migrate old doc chunks data if needed
412
  try:
413
  await self._migrate_doc_chunks_to_vdb_chunks()
@@ -421,6 +541,22 @@ class PostgreSQLDB:
421
  except Exception as e:
422
  logger.error(f"PostgreSQL, LLM cache migration failed: {e}")
423
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
424
  async def query(
425
  self,
426
  sql: str,
@@ -608,24 +744,36 @@ class PGKVStorage(BaseKVStorage):
608
  if is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE):
609
  processed_results = {}
610
  for row in results:
611
- # Parse flattened key to extract cache_type
612
- key_parts = row["id"].split(":")
613
- cache_type = key_parts[1] if len(key_parts) >= 3 else "unknown"
614
-
615
  # Map field names and add cache_type for compatibility
616
  processed_row = {
617
  **row,
618
- "return": row.get(
619
- "return_value", ""
620
- ), # Map return_value to return
621
- "cache_type": cache_type, # Add cache_type from key
622
  "original_prompt": row.get("original_prompt", ""),
623
  "chunk_id": row.get("chunk_id"),
624
  "mode": row.get("mode", "default"),
 
 
625
  }
626
  processed_results[row["id"]] = processed_row
627
  return processed_results
628
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
629
  # For other namespaces, return as-is
630
  return {row["id"]: row for row in results}
631
  except Exception as e:
@@ -637,6 +785,35 @@ class PGKVStorage(BaseKVStorage):
637
  sql = SQL_TEMPLATES["get_by_id_" + self.namespace]
638
  params = {"workspace": self.db.workspace, "id": id}
639
  response = await self.db.query(sql, params)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
640
  return response if response else None
641
 
642
  # Query by id
@@ -646,13 +823,42 @@ class PGKVStorage(BaseKVStorage):
646
  ids=",".join([f"'{id}'" for id in ids])
647
  )
648
  params = {"workspace": self.db.workspace}
649
- return await self.db.query(sql, params, multirows=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
650
 
651
- async def get_by_status(self, status: str) -> Union[list[dict[str, Any]], None]:
652
- """Specifically for llm_response_cache."""
653
- SQL = SQL_TEMPLATES["get_by_status_" + self.namespace]
654
- params = {"workspace": self.db.workspace, "status": status}
655
- return await self.db.query(SQL, params, multirows=True)
656
 
657
  async def filter_keys(self, keys: set[str]) -> set[str]:
658
  """Filter out duplicated content"""
@@ -693,6 +899,7 @@ class PGKVStorage(BaseKVStorage):
693
  "full_doc_id": v["full_doc_id"],
694
  "content": v["content"],
695
  "file_path": v["file_path"],
 
696
  "create_time": current_time,
697
  "update_time": current_time,
698
  }
@@ -716,6 +923,9 @@ class PGKVStorage(BaseKVStorage):
716
  "return_value": v["return"],
717
  "mode": v.get("mode", "default"), # Get mode from data
718
  "chunk_id": v.get("chunk_id"),
 
 
 
719
  }
720
 
721
  await self.db.execute(upsert_sql, _data)
@@ -1140,6 +1350,14 @@ class PGDocStatusStorage(DocStatusStorage):
1140
  if result is None or result == []:
1141
  return None
1142
  else:
 
 
 
 
 
 
 
 
1143
  return dict(
1144
  content=result[0]["content"],
1145
  content_length=result[0]["content_length"],
@@ -1149,6 +1367,7 @@ class PGDocStatusStorage(DocStatusStorage):
1149
  created_at=result[0]["created_at"],
1150
  updated_at=result[0]["updated_at"],
1151
  file_path=result[0]["file_path"],
 
1152
  )
1153
 
1154
  async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
@@ -1163,19 +1382,32 @@ class PGDocStatusStorage(DocStatusStorage):
1163
 
1164
  if not results:
1165
  return []
1166
- return [
1167
- {
1168
- "content": row["content"],
1169
- "content_length": row["content_length"],
1170
- "content_summary": row["content_summary"],
1171
- "status": row["status"],
1172
- "chunks_count": row["chunks_count"],
1173
- "created_at": row["created_at"],
1174
- "updated_at": row["updated_at"],
1175
- "file_path": row["file_path"],
1176
- }
1177
- for row in results
1178
- ]
 
 
 
 
 
 
 
 
 
 
 
 
 
1179
 
1180
  async def get_status_counts(self) -> dict[str, int]:
1181
  """Get counts of documents in each status"""
@@ -1196,8 +1428,18 @@ class PGDocStatusStorage(DocStatusStorage):
1196
  sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
1197
  params = {"workspace": self.db.workspace, "status": status.value}
1198
  result = await self.db.query(sql, params, True)
1199
- docs_by_status = {
1200
- element["id"]: DocProcessingStatus(
 
 
 
 
 
 
 
 
 
 
1201
  content=element["content"],
1202
  content_summary=element["content_summary"],
1203
  content_length=element["content_length"],
@@ -1206,9 +1448,9 @@ class PGDocStatusStorage(DocStatusStorage):
1206
  updated_at=element["updated_at"],
1207
  chunks_count=element["chunks_count"],
1208
  file_path=element["file_path"],
 
1209
  )
1210
- for element in result
1211
- }
1212
  return docs_by_status
1213
 
1214
  async def index_done_callback(self) -> None:
@@ -1272,10 +1514,10 @@ class PGDocStatusStorage(DocStatusStorage):
1272
  logger.warning(f"Unable to parse datetime string: {dt_str}")
1273
  return None
1274
 
1275
- # Modified SQL to include created_at and updated_at in both INSERT and UPDATE operations
1276
- # Both fields are updated from the input data in both INSERT and UPDATE cases
1277
- sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path,created_at,updated_at)
1278
- values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
1279
  on conflict(id,workspace) do update set
1280
  content = EXCLUDED.content,
1281
  content_summary = EXCLUDED.content_summary,
@@ -1283,6 +1525,7 @@ class PGDocStatusStorage(DocStatusStorage):
1283
  chunks_count = EXCLUDED.chunks_count,
1284
  status = EXCLUDED.status,
1285
  file_path = EXCLUDED.file_path,
 
1286
  created_at = EXCLUDED.created_at,
1287
  updated_at = EXCLUDED.updated_at"""
1288
  for k, v in data.items():
@@ -1290,7 +1533,7 @@ class PGDocStatusStorage(DocStatusStorage):
1290
  created_at = parse_datetime(v.get("created_at"))
1291
  updated_at = parse_datetime(v.get("updated_at"))
1292
 
1293
- # chunks_count is optional
1294
  await self.db.execute(
1295
  sql,
1296
  {
@@ -1302,6 +1545,7 @@ class PGDocStatusStorage(DocStatusStorage):
1302
  "chunks_count": v["chunks_count"] if "chunks_count" in v else -1,
1303
  "status": v["status"],
1304
  "file_path": v["file_path"],
 
1305
  "created_at": created_at, # Use the converted datetime object
1306
  "updated_at": updated_at, # Use the converted datetime object
1307
  },
@@ -2620,6 +2864,7 @@ TABLES = {
2620
  tokens INTEGER,
2621
  content TEXT,
2622
  file_path VARCHAR(256),
 
2623
  create_time TIMESTAMP(0) WITH TIME ZONE,
2624
  update_time TIMESTAMP(0) WITH TIME ZONE,
2625
  CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
@@ -2692,6 +2937,7 @@ TABLES = {
2692
  chunks_count int4 NULL,
2693
  status varchar(64) NULL,
2694
  file_path TEXT NULL,
 
2695
  created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL,
2696
  updated_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL,
2697
  CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id)
@@ -2706,24 +2952,30 @@ SQL_TEMPLATES = {
2706
  FROM LIGHTRAG_DOC_FULL WHERE workspace=$1 AND id=$2
2707
  """,
2708
  "get_by_id_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
2709
- chunk_order_index, full_doc_id, file_path
 
 
2710
  FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2
2711
  """,
2712
- "get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
 
2713
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id=$2
2714
  """,
2715
- "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
2716
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 AND id=$3
2717
  """,
2718
  "get_by_ids_full_docs": """SELECT id, COALESCE(content, '') as content
2719
  FROM LIGHTRAG_DOC_FULL WHERE workspace=$1 AND id IN ({ids})
2720
  """,
2721
  "get_by_ids_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
2722
- chunk_order_index, full_doc_id, file_path
 
 
2723
  FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN ({ids})
2724
  """,
2725
- "get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
2726
- FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode= IN ({ids})
 
2727
  """,
2728
  "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",
2729
  "upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, workspace)
@@ -2731,25 +2983,27 @@ SQL_TEMPLATES = {
2731
  ON CONFLICT (workspace,id) DO UPDATE
2732
  SET content = $2, update_time = CURRENT_TIMESTAMP
2733
  """,
2734
- "upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode,chunk_id)
2735
- VALUES ($1, $2, $3, $4, $5, $6)
2736
  ON CONFLICT (workspace,mode,id) DO UPDATE
2737
  SET original_prompt = EXCLUDED.original_prompt,
2738
  return_value=EXCLUDED.return_value,
2739
  mode=EXCLUDED.mode,
2740
  chunk_id=EXCLUDED.chunk_id,
 
2741
  update_time = CURRENT_TIMESTAMP
2742
  """,
2743
  "upsert_text_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
2744
- chunk_order_index, full_doc_id, content, file_path,
2745
  create_time, update_time)
2746
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
2747
  ON CONFLICT (workspace,id) DO UPDATE
2748
  SET tokens=EXCLUDED.tokens,
2749
  chunk_order_index=EXCLUDED.chunk_order_index,
2750
  full_doc_id=EXCLUDED.full_doc_id,
2751
  content = EXCLUDED.content,
2752
  file_path=EXCLUDED.file_path,
 
2753
  update_time = EXCLUDED.update_time
2754
  """,
2755
  # SQL for VectorStorage
 
136
  except Exception as e:
137
  logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}")
138
 
139
+ async def _migrate_llm_cache_add_cache_type(self):
140
+ """Add cache_type column to LIGHTRAG_LLM_CACHE table if it doesn't exist"""
141
+ try:
142
+ # Check if cache_type column exists
143
+ check_column_sql = """
144
+ SELECT column_name
145
+ FROM information_schema.columns
146
+ WHERE table_name = 'lightrag_llm_cache'
147
+ AND column_name = 'cache_type'
148
+ """
149
+
150
+ column_info = await self.query(check_column_sql)
151
+ if not column_info:
152
+ logger.info("Adding cache_type column to LIGHTRAG_LLM_CACHE table")
153
+ add_column_sql = """
154
+ ALTER TABLE LIGHTRAG_LLM_CACHE
155
+ ADD COLUMN cache_type VARCHAR(32) NULL
156
+ """
157
+ await self.execute(add_column_sql)
158
+ logger.info(
159
+ "Successfully added cache_type column to LIGHTRAG_LLM_CACHE table"
160
+ )
161
+
162
+ # Migrate existing data: extract cache_type from flattened keys
163
+ logger.info(
164
+ "Migrating existing LLM cache data to populate cache_type field"
165
+ )
166
+ update_sql = """
167
+ UPDATE LIGHTRAG_LLM_CACHE
168
+ SET cache_type = CASE
169
+ WHEN id LIKE '%:%:%' THEN split_part(id, ':', 2)
170
+ ELSE 'extract'
171
+ END
172
+ WHERE cache_type IS NULL
173
+ """
174
+ await self.execute(update_sql)
175
+ logger.info("Successfully migrated existing LLM cache data")
176
+ else:
177
+ logger.info(
178
+ "cache_type column already exists in LIGHTRAG_LLM_CACHE table"
179
+ )
180
+ except Exception as e:
181
+ logger.warning(
182
+ f"Failed to add cache_type column to LIGHTRAG_LLM_CACHE: {e}"
183
+ )
184
+
185
  async def _migrate_timestamp_columns(self):
186
  """Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC time"""
187
  # Tables and columns that need migration
 
347
  record["mode"], record["original_prompt"]
348
  )
349
 
350
+ # Determine cache_type based on mode
351
+ cache_type = "extract" if record["mode"] == "default" else "unknown"
352
+
353
  # Generate new flattened key
 
354
  new_key = f"{record['mode']}:{cache_type}:{new_hash}"
355
 
356
+ # Insert new format data with cache_type field
357
  insert_sql = """
358
  INSERT INTO LIGHTRAG_LLM_CACHE
359
+ (workspace, id, mode, original_prompt, return_value, chunk_id, cache_type, create_time, update_time)
360
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
361
  ON CONFLICT (workspace, mode, id) DO NOTHING
362
  """
363
 
 
370
  "original_prompt": record["original_prompt"],
371
  "return_value": record["return_value"],
372
  "chunk_id": record["chunk_id"],
373
+ "cache_type": cache_type, # Add cache_type field
374
  "create_time": record["create_time"],
375
  "update_time": record["update_time"],
376
  },
 
406
  logger.error(f"LLM cache migration failed: {e}")
407
  # Don't raise exception, allow system to continue startup
408
 
409
+ async def _migrate_doc_status_add_chunks_list(self):
410
+ """Add chunks_list column to LIGHTRAG_DOC_STATUS table if it doesn't exist"""
411
+ try:
412
+ # Check if chunks_list column exists
413
+ check_column_sql = """
414
+ SELECT column_name
415
+ FROM information_schema.columns
416
+ WHERE table_name = 'lightrag_doc_status'
417
+ AND column_name = 'chunks_list'
418
+ """
419
+
420
+ column_info = await self.query(check_column_sql)
421
+ if not column_info:
422
+ logger.info("Adding chunks_list column to LIGHTRAG_DOC_STATUS table")
423
+ add_column_sql = """
424
+ ALTER TABLE LIGHTRAG_DOC_STATUS
425
+ ADD COLUMN chunks_list JSONB NULL DEFAULT '[]'::jsonb
426
+ """
427
+ await self.execute(add_column_sql)
428
+ logger.info(
429
+ "Successfully added chunks_list column to LIGHTRAG_DOC_STATUS table"
430
+ )
431
+ else:
432
+ logger.info(
433
+ "chunks_list column already exists in LIGHTRAG_DOC_STATUS table"
434
+ )
435
+ except Exception as e:
436
+ logger.warning(
437
+ f"Failed to add chunks_list column to LIGHTRAG_DOC_STATUS: {e}"
438
+ )
439
+
440
+ async def _migrate_text_chunks_add_llm_cache_list(self):
441
+ """Add llm_cache_list column to LIGHTRAG_DOC_CHUNKS table if it doesn't exist"""
442
+ try:
443
+ # Check if llm_cache_list column exists
444
+ check_column_sql = """
445
+ SELECT column_name
446
+ FROM information_schema.columns
447
+ WHERE table_name = 'lightrag_doc_chunks'
448
+ AND column_name = 'llm_cache_list'
449
+ """
450
+
451
+ column_info = await self.query(check_column_sql)
452
+ if not column_info:
453
+ logger.info("Adding llm_cache_list column to LIGHTRAG_DOC_CHUNKS table")
454
+ add_column_sql = """
455
+ ALTER TABLE LIGHTRAG_DOC_CHUNKS
456
+ ADD COLUMN llm_cache_list JSONB NULL DEFAULT '[]'::jsonb
457
+ """
458
+ await self.execute(add_column_sql)
459
+ logger.info(
460
+ "Successfully added llm_cache_list column to LIGHTRAG_DOC_CHUNKS table"
461
+ )
462
+ else:
463
+ logger.info(
464
+ "llm_cache_list column already exists in LIGHTRAG_DOC_CHUNKS table"
465
+ )
466
+ except Exception as e:
467
+ logger.warning(
468
+ f"Failed to add llm_cache_list column to LIGHTRAG_DOC_CHUNKS: {e}"
469
+ )
470
+
471
  async def check_tables(self):
472
  # First create all tables
473
  for k, v in TABLES.items():
 
519
  logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}")
520
  # Don't throw an exception, allow the initialization process to continue
521
 
522
+ # Migrate LLM cache table to add cache_type field if needed
523
+ try:
524
+ await self._migrate_llm_cache_add_cache_type()
525
+ except Exception as e:
526
+ logger.error(
527
+ f"PostgreSQL, Failed to migrate LLM cache cache_type field: {e}"
528
+ )
529
+ # Don't throw an exception, allow the initialization process to continue
530
+
531
  # Finally, attempt to migrate old doc chunks data if needed
532
  try:
533
  await self._migrate_doc_chunks_to_vdb_chunks()
 
541
  except Exception as e:
542
  logger.error(f"PostgreSQL, LLM cache migration failed: {e}")
543
 
544
+ # Migrate doc status to add chunks_list field if needed
545
+ try:
546
+ await self._migrate_doc_status_add_chunks_list()
547
+ except Exception as e:
548
+ logger.error(
549
+ f"PostgreSQL, Failed to migrate doc status chunks_list field: {e}"
550
+ )
551
+
552
+ # Migrate text chunks to add llm_cache_list field if needed
553
+ try:
554
+ await self._migrate_text_chunks_add_llm_cache_list()
555
+ except Exception as e:
556
+ logger.error(
557
+ f"PostgreSQL, Failed to migrate text chunks llm_cache_list field: {e}"
558
+ )
559
+
560
  async def query(
561
  self,
562
  sql: str,
 
744
  if is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE):
745
  processed_results = {}
746
  for row in results:
 
 
 
 
747
  # Map field names and add cache_type for compatibility
748
  processed_row = {
749
  **row,
750
+ "return": row.get("return_value", ""),
751
+ "cache_type": row.get("original_prompt", "unknow"),
 
 
752
  "original_prompt": row.get("original_prompt", ""),
753
  "chunk_id": row.get("chunk_id"),
754
  "mode": row.get("mode", "default"),
755
+ "create_time": row.get("create_time", 0),
756
+ "update_time": row.get("update_time", 0),
757
  }
758
  processed_results[row["id"]] = processed_row
759
  return processed_results
760
 
761
+ # For text_chunks namespace, parse llm_cache_list JSON string back to list
762
+ if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
763
+ processed_results = {}
764
+ for row in results:
765
+ llm_cache_list = row.get("llm_cache_list", [])
766
+ if isinstance(llm_cache_list, str):
767
+ try:
768
+ llm_cache_list = json.loads(llm_cache_list)
769
+ except json.JSONDecodeError:
770
+ llm_cache_list = []
771
+ row["llm_cache_list"] = llm_cache_list
772
+ row["create_time"] = row.get("create_time", 0)
773
+ row["update_time"] = row.get("update_time", 0)
774
+ processed_results[row["id"]] = row
775
+ return processed_results
776
+
777
  # For other namespaces, return as-is
778
  return {row["id"]: row for row in results}
779
  except Exception as e:
 
785
  sql = SQL_TEMPLATES["get_by_id_" + self.namespace]
786
  params = {"workspace": self.db.workspace, "id": id}
787
  response = await self.db.query(sql, params)
788
+
789
+ if response and is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
790
+ # Parse llm_cache_list JSON string back to list
791
+ llm_cache_list = response.get("llm_cache_list", [])
792
+ if isinstance(llm_cache_list, str):
793
+ try:
794
+ llm_cache_list = json.loads(llm_cache_list)
795
+ except json.JSONDecodeError:
796
+ llm_cache_list = []
797
+ response["llm_cache_list"] = llm_cache_list
798
+ response["create_time"] = response.get("create_time", 0)
799
+ response["update_time"] = response.get("update_time", 0)
800
+
801
+ # Special handling for LLM cache to ensure compatibility with _get_cached_extraction_results
802
+ if response and is_namespace(
803
+ self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE
804
+ ):
805
+ # Map field names and add cache_type for compatibility
806
+ response = {
807
+ **response,
808
+ "return": response.get("return_value", ""),
809
+ "cache_type": response.get("cache_type"),
810
+ "original_prompt": response.get("original_prompt", ""),
811
+ "chunk_id": response.get("chunk_id"),
812
+ "mode": response.get("mode", "default"),
813
+ "create_time": response.get("create_time", 0),
814
+ "update_time": response.get("update_time", 0),
815
+ }
816
+
817
  return response if response else None
818
 
819
  # Query by id
 
823
  ids=",".join([f"'{id}'" for id in ids])
824
  )
825
  params = {"workspace": self.db.workspace}
826
+ results = await self.db.query(sql, params, multirows=True)
827
+
828
+ if results and is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
829
+ # Parse llm_cache_list JSON string back to list for each result
830
+ for result in results:
831
+ llm_cache_list = result.get("llm_cache_list", [])
832
+ if isinstance(llm_cache_list, str):
833
+ try:
834
+ llm_cache_list = json.loads(llm_cache_list)
835
+ except json.JSONDecodeError:
836
+ llm_cache_list = []
837
+ result["llm_cache_list"] = llm_cache_list
838
+ result["create_time"] = result.get("create_time", 0)
839
+ result["update_time"] = result.get("update_time", 0)
840
+
841
+ # Special handling for LLM cache to ensure compatibility with _get_cached_extraction_results
842
+ if results and is_namespace(
843
+ self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE
844
+ ):
845
+ processed_results = []
846
+ for row in results:
847
+ # Map field names and add cache_type for compatibility
848
+ processed_row = {
849
+ **row,
850
+ "return": row.get("return_value", ""),
851
+ "cache_type": row.get("cache_type"),
852
+ "original_prompt": row.get("original_prompt", ""),
853
+ "chunk_id": row.get("chunk_id"),
854
+ "mode": row.get("mode", "default"),
855
+ "create_time": row.get("create_time", 0),
856
+ "update_time": row.get("update_time", 0),
857
+ }
858
+ processed_results.append(processed_row)
859
+ return processed_results
860
 
861
+ return results if results else []
 
 
 
 
862
 
863
  async def filter_keys(self, keys: set[str]) -> set[str]:
864
  """Filter out duplicated content"""
 
899
  "full_doc_id": v["full_doc_id"],
900
  "content": v["content"],
901
  "file_path": v["file_path"],
902
+ "llm_cache_list": json.dumps(v.get("llm_cache_list", [])),
903
  "create_time": current_time,
904
  "update_time": current_time,
905
  }
 
923
  "return_value": v["return"],
924
  "mode": v.get("mode", "default"), # Get mode from data
925
  "chunk_id": v.get("chunk_id"),
926
+ "cache_type": v.get(
927
+ "cache_type", "extract"
928
+ ), # Get cache_type from data
929
  }
930
 
931
  await self.db.execute(upsert_sql, _data)
 
1350
  if result is None or result == []:
1351
  return None
1352
  else:
1353
+ # Parse chunks_list JSON string back to list
1354
+ chunks_list = result[0].get("chunks_list", [])
1355
+ if isinstance(chunks_list, str):
1356
+ try:
1357
+ chunks_list = json.loads(chunks_list)
1358
+ except json.JSONDecodeError:
1359
+ chunks_list = []
1360
+
1361
  return dict(
1362
  content=result[0]["content"],
1363
  content_length=result[0]["content_length"],
 
1367
  created_at=result[0]["created_at"],
1368
  updated_at=result[0]["updated_at"],
1369
  file_path=result[0]["file_path"],
1370
+ chunks_list=chunks_list,
1371
  )
1372
 
1373
  async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
 
1382
 
1383
  if not results:
1384
  return []
1385
+
1386
+ processed_results = []
1387
+ for row in results:
1388
+ # Parse chunks_list JSON string back to list
1389
+ chunks_list = row.get("chunks_list", [])
1390
+ if isinstance(chunks_list, str):
1391
+ try:
1392
+ chunks_list = json.loads(chunks_list)
1393
+ except json.JSONDecodeError:
1394
+ chunks_list = []
1395
+
1396
+ processed_results.append(
1397
+ {
1398
+ "content": row["content"],
1399
+ "content_length": row["content_length"],
1400
+ "content_summary": row["content_summary"],
1401
+ "status": row["status"],
1402
+ "chunks_count": row["chunks_count"],
1403
+ "created_at": row["created_at"],
1404
+ "updated_at": row["updated_at"],
1405
+ "file_path": row["file_path"],
1406
+ "chunks_list": chunks_list,
1407
+ }
1408
+ )
1409
+
1410
+ return processed_results
1411
 
1412
  async def get_status_counts(self) -> dict[str, int]:
1413
  """Get counts of documents in each status"""
 
1428
  sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
1429
  params = {"workspace": self.db.workspace, "status": status.value}
1430
  result = await self.db.query(sql, params, True)
1431
+
1432
+ docs_by_status = {}
1433
+ for element in result:
1434
+ # Parse chunks_list JSON string back to list
1435
+ chunks_list = element.get("chunks_list", [])
1436
+ if isinstance(chunks_list, str):
1437
+ try:
1438
+ chunks_list = json.loads(chunks_list)
1439
+ except json.JSONDecodeError:
1440
+ chunks_list = []
1441
+
1442
+ docs_by_status[element["id"]] = DocProcessingStatus(
1443
  content=element["content"],
1444
  content_summary=element["content_summary"],
1445
  content_length=element["content_length"],
 
1448
  updated_at=element["updated_at"],
1449
  chunks_count=element["chunks_count"],
1450
  file_path=element["file_path"],
1451
+ chunks_list=chunks_list,
1452
  )
1453
+
 
1454
  return docs_by_status
1455
 
1456
  async def index_done_callback(self) -> None:
 
1514
  logger.warning(f"Unable to parse datetime string: {dt_str}")
1515
  return None
1516
 
1517
+ # Modified SQL to include created_at, updated_at, and chunks_list in both INSERT and UPDATE operations
1518
+ # All fields are updated from the input data in both INSERT and UPDATE cases
1519
+ sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path,chunks_list,created_at,updated_at)
1520
+ values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
1521
  on conflict(id,workspace) do update set
1522
  content = EXCLUDED.content,
1523
  content_summary = EXCLUDED.content_summary,
 
1525
  chunks_count = EXCLUDED.chunks_count,
1526
  status = EXCLUDED.status,
1527
  file_path = EXCLUDED.file_path,
1528
+ chunks_list = EXCLUDED.chunks_list,
1529
  created_at = EXCLUDED.created_at,
1530
  updated_at = EXCLUDED.updated_at"""
1531
  for k, v in data.items():
 
1533
  created_at = parse_datetime(v.get("created_at"))
1534
  updated_at = parse_datetime(v.get("updated_at"))
1535
 
1536
+ # chunks_count and chunks_list are optional
1537
  await self.db.execute(
1538
  sql,
1539
  {
 
1545
  "chunks_count": v["chunks_count"] if "chunks_count" in v else -1,
1546
  "status": v["status"],
1547
  "file_path": v["file_path"],
1548
+ "chunks_list": json.dumps(v.get("chunks_list", [])),
1549
  "created_at": created_at, # Use the converted datetime object
1550
  "updated_at": updated_at, # Use the converted datetime object
1551
  },
 
2864
  tokens INTEGER,
2865
  content TEXT,
2866
  file_path VARCHAR(256),
2867
+ llm_cache_list JSONB NULL DEFAULT '[]'::jsonb,
2868
  create_time TIMESTAMP(0) WITH TIME ZONE,
2869
  update_time TIMESTAMP(0) WITH TIME ZONE,
2870
  CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
 
2937
  chunks_count int4 NULL,
2938
  status varchar(64) NULL,
2939
  file_path TEXT NULL,
2940
+ chunks_list JSONB NULL DEFAULT '[]'::jsonb,
2941
  created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL,
2942
  updated_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL,
2943
  CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id)
 
2952
  FROM LIGHTRAG_DOC_FULL WHERE workspace=$1 AND id=$2
2953
  """,
2954
  "get_by_id_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
2955
+ chunk_order_index, full_doc_id, file_path,
2956
+ COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list,
2957
+ create_time, update_time
2958
  FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2
2959
  """,
2960
+ "get_by_id_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id, cache_type,
2961
+ create_time, update_time
2962
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id=$2
2963
  """,
2964
+ "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id
2965
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 AND id=$3
2966
  """,
2967
  "get_by_ids_full_docs": """SELECT id, COALESCE(content, '') as content
2968
  FROM LIGHTRAG_DOC_FULL WHERE workspace=$1 AND id IN ({ids})
2969
  """,
2970
  "get_by_ids_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
2971
+ chunk_order_index, full_doc_id, file_path,
2972
+ COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list,
2973
+ create_time, update_time
2974
  FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN ({ids})
2975
  """,
2976
+ "get_by_ids_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id, cache_type,
2977
+ create_time, update_time
2978
+ FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id IN ({ids})
2979
  """,
2980
  "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",
2981
  "upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, workspace)
 
2983
  ON CONFLICT (workspace,id) DO UPDATE
2984
  SET content = $2, update_time = CURRENT_TIMESTAMP
2985
  """,
2986
+ "upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode,chunk_id,cache_type)
2987
+ VALUES ($1, $2, $3, $4, $5, $6, $7)
2988
  ON CONFLICT (workspace,mode,id) DO UPDATE
2989
  SET original_prompt = EXCLUDED.original_prompt,
2990
  return_value=EXCLUDED.return_value,
2991
  mode=EXCLUDED.mode,
2992
  chunk_id=EXCLUDED.chunk_id,
2993
+ cache_type=EXCLUDED.cache_type,
2994
  update_time = CURRENT_TIMESTAMP
2995
  """,
2996
  "upsert_text_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
2997
+ chunk_order_index, full_doc_id, content, file_path, llm_cache_list,
2998
  create_time, update_time)
2999
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
3000
  ON CONFLICT (workspace,id) DO UPDATE
3001
  SET tokens=EXCLUDED.tokens,
3002
  chunk_order_index=EXCLUDED.chunk_order_index,
3003
  full_doc_id=EXCLUDED.full_doc_id,
3004
  content = EXCLUDED.content,
3005
  file_path=EXCLUDED.file_path,
3006
+ llm_cache_list=EXCLUDED.llm_cache_list,
3007
  update_time = EXCLUDED.update_time
3008
  """,
3009
  # SQL for VectorStorage
lightrag/kg/redis_impl.py CHANGED
@@ -132,7 +132,13 @@ class RedisKVStorage(BaseKVStorage):
132
  async with self._get_redis_connection() as redis:
133
  try:
134
  data = await redis.get(f"{self.namespace}:{id}")
135
- return json.loads(data) if data else None
 
 
 
 
 
 
136
  except json.JSONDecodeError as e:
137
  logger.error(f"JSON decode error for id {id}: {e}")
138
  return None
@@ -144,7 +150,19 @@ class RedisKVStorage(BaseKVStorage):
144
  for id in ids:
145
  pipe.get(f"{self.namespace}:{id}")
146
  results = await pipe.execute()
147
- return [json.loads(result) if result else None for result in results]
 
 
 
 
 
 
 
 
 
 
 
 
148
  except json.JSONDecodeError as e:
149
  logger.error(f"JSON decode error in batch get: {e}")
150
  return [None] * len(ids)
@@ -176,7 +194,11 @@ class RedisKVStorage(BaseKVStorage):
176
  # Extract the ID part (after namespace:)
177
  key_id = key.split(":", 1)[1]
178
  try:
179
- result[key_id] = json.loads(value)
 
 
 
 
180
  except json.JSONDecodeError as e:
181
  logger.error(f"JSON decode error for key {key}: {e}")
182
  continue
@@ -200,15 +222,41 @@ class RedisKVStorage(BaseKVStorage):
200
  async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
201
  if not data:
202
  return
 
 
 
 
 
203
  async with self._get_redis_connection() as redis:
204
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
205
  pipe = redis.pipeline()
206
  for k, v in data.items():
207
  pipe.set(f"{self.namespace}:{k}", json.dumps(v))
208
  await pipe.execute()
209
 
210
- for k in data:
211
- data[k]["_id"] = k
212
  except json.JSONEncodeError as e:
213
  logger.error(f"JSON encode error during upsert: {e}")
214
  raise
@@ -601,6 +649,11 @@ class RedisDocStatusStorage(DocStatusStorage):
601
  logger.debug(f"Inserting {len(data)} records to {self.namespace}")
602
  async with self._get_redis_connection() as redis:
603
  try:
 
 
 
 
 
604
  pipe = redis.pipeline()
605
  for k, v in data.items():
606
  pipe.set(f"{self.namespace}:{k}", json.dumps(v))
 
132
  async with self._get_redis_connection() as redis:
133
  try:
134
  data = await redis.get(f"{self.namespace}:{id}")
135
+ if data:
136
+ result = json.loads(data)
137
+ # Ensure time fields are present, provide default values for old data
138
+ result.setdefault("create_time", 0)
139
+ result.setdefault("update_time", 0)
140
+ return result
141
+ return None
142
  except json.JSONDecodeError as e:
143
  logger.error(f"JSON decode error for id {id}: {e}")
144
  return None
 
150
  for id in ids:
151
  pipe.get(f"{self.namespace}:{id}")
152
  results = await pipe.execute()
153
+
154
+ processed_results = []
155
+ for result in results:
156
+ if result:
157
+ data = json.loads(result)
158
+ # Ensure time fields are present for all documents
159
+ data.setdefault("create_time", 0)
160
+ data.setdefault("update_time", 0)
161
+ processed_results.append(data)
162
+ else:
163
+ processed_results.append(None)
164
+
165
+ return processed_results
166
  except json.JSONDecodeError as e:
167
  logger.error(f"JSON decode error in batch get: {e}")
168
  return [None] * len(ids)
 
194
  # Extract the ID part (after namespace:)
195
  key_id = key.split(":", 1)[1]
196
  try:
197
+ data = json.loads(value)
198
+ # Ensure time fields are present for all documents
199
+ data.setdefault("create_time", 0)
200
+ data.setdefault("update_time", 0)
201
+ result[key_id] = data
202
  except json.JSONDecodeError as e:
203
  logger.error(f"JSON decode error for key {key}: {e}")
204
  continue
 
222
  async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
223
  if not data:
224
  return
225
+
226
+ import time
227
+
228
+ current_time = int(time.time()) # Get current Unix timestamp
229
+
230
  async with self._get_redis_connection() as redis:
231
  try:
232
+ # Check which keys already exist to determine create vs update
233
+ pipe = redis.pipeline()
234
+ for k in data.keys():
235
+ pipe.exists(f"{self.namespace}:{k}")
236
+ exists_results = await pipe.execute()
237
+
238
+ # Add timestamps to data
239
+ for i, (k, v) in enumerate(data.items()):
240
+ # For text_chunks namespace, ensure llm_cache_list field exists
241
+ if "text_chunks" in self.namespace:
242
+ if "llm_cache_list" not in v:
243
+ v["llm_cache_list"] = []
244
+
245
+ # Add timestamps based on whether key exists
246
+ if exists_results[i]: # Key exists, only update update_time
247
+ v["update_time"] = current_time
248
+ else: # New key, set both create_time and update_time
249
+ v["create_time"] = current_time
250
+ v["update_time"] = current_time
251
+
252
+ v["_id"] = k
253
+
254
+ # Store the data
255
  pipe = redis.pipeline()
256
  for k, v in data.items():
257
  pipe.set(f"{self.namespace}:{k}", json.dumps(v))
258
  await pipe.execute()
259
 
 
 
260
  except json.JSONEncodeError as e:
261
  logger.error(f"JSON encode error during upsert: {e}")
262
  raise
 
649
  logger.debug(f"Inserting {len(data)} records to {self.namespace}")
650
  async with self._get_redis_connection() as redis:
651
  try:
652
+ # Ensure chunks_list field exists for new documents
653
+ for doc_id, doc_data in data.items():
654
+ if "chunks_list" not in doc_data:
655
+ doc_data["chunks_list"] = []
656
+
657
  pipe = redis.pipeline()
658
  for k, v in data.items():
659
  pipe.set(f"{self.namespace}:{k}", json.dumps(v))
lightrag/kg/tidb_impl.py CHANGED
@@ -520,11 +520,6 @@ class TiDBVectorDBStorage(BaseVectorStorage):
520
  }
521
  await self.db.execute(SQL_TEMPLATES["upsert_relationship"], param)
522
 
523
- async def get_by_status(self, status: str) -> Union[list[dict[str, Any]], None]:
524
- SQL = SQL_TEMPLATES["get_by_status_" + self.namespace]
525
- params = {"workspace": self.db.workspace, "status": status}
526
- return await self.db.query(SQL, params, multirows=True)
527
-
528
  async def delete(self, ids: list[str]) -> None:
529
  """Delete vectors with specified IDs from the storage.
530
 
 
520
  }
521
  await self.db.execute(SQL_TEMPLATES["upsert_relationship"], param)
522
 
 
 
 
 
 
523
  async def delete(self, ids: list[str]) -> None:
524
  """Delete vectors with specified IDs from the storage.
525
 
lightrag/lightrag.py CHANGED
@@ -349,6 +349,7 @@ class LightRAG:
349
 
350
  # Fix global_config now
351
  global_config = asdict(self)
 
352
  _print_config = ",\n ".join([f"{k} = {v}" for k, v in global_config.items()])
353
  logger.debug(f"LightRAG init with param:\n {_print_config}\n")
354
 
@@ -952,6 +953,7 @@ class LightRAG:
952
  **dp,
953
  "full_doc_id": doc_id,
954
  "file_path": file_path, # Add file path to each chunk
 
955
  }
956
  for dp in self.chunking_func(
957
  self.tokenizer,
@@ -963,14 +965,17 @@ class LightRAG:
963
  )
964
  }
965
 
966
- # Process document (text chunks and full docs) in parallel
967
- # Create tasks with references for potential cancellation
968
  doc_status_task = asyncio.create_task(
969
  self.doc_status.upsert(
970
  {
971
  doc_id: {
972
  "status": DocStatus.PROCESSING,
973
  "chunks_count": len(chunks),
 
 
 
974
  "content": status_doc.content,
975
  "content_summary": status_doc.content_summary,
976
  "content_length": status_doc.content_length,
@@ -986,11 +991,6 @@ class LightRAG:
986
  chunks_vdb_task = asyncio.create_task(
987
  self.chunks_vdb.upsert(chunks)
988
  )
989
- entity_relation_task = asyncio.create_task(
990
- self._process_entity_relation_graph(
991
- chunks, pipeline_status, pipeline_status_lock
992
- )
993
- )
994
  full_docs_task = asyncio.create_task(
995
  self.full_docs.upsert(
996
  {doc_id: {"content": status_doc.content}}
@@ -999,14 +999,26 @@ class LightRAG:
999
  text_chunks_task = asyncio.create_task(
1000
  self.text_chunks.upsert(chunks)
1001
  )
1002
- tasks = [
 
 
1003
  doc_status_task,
1004
  chunks_vdb_task,
1005
- entity_relation_task,
1006
  full_docs_task,
1007
  text_chunks_task,
1008
  ]
1009
- await asyncio.gather(*tasks)
 
 
 
 
 
 
 
 
 
 
 
1010
  file_extraction_stage_ok = True
1011
 
1012
  except Exception as e:
@@ -1021,14 +1033,14 @@ class LightRAG:
1021
  )
1022
  pipeline_status["history_messages"].append(error_msg)
1023
 
1024
- # Cancel other tasks as they are no longer meaningful
1025
- for task in [
1026
- chunks_vdb_task,
1027
- entity_relation_task,
1028
- full_docs_task,
1029
- text_chunks_task,
1030
- ]:
1031
- if not task.done():
1032
  task.cancel()
1033
 
1034
  # Persistent llm cache
@@ -1078,6 +1090,9 @@ class LightRAG:
1078
  doc_id: {
1079
  "status": DocStatus.PROCESSED,
1080
  "chunks_count": len(chunks),
 
 
 
1081
  "content": status_doc.content,
1082
  "content_summary": status_doc.content_summary,
1083
  "content_length": status_doc.content_length,
@@ -1196,6 +1211,7 @@ class LightRAG:
1196
  pipeline_status=pipeline_status,
1197
  pipeline_status_lock=pipeline_status_lock,
1198
  llm_response_cache=self.llm_response_cache,
 
1199
  )
1200
  return chunk_results
1201
  except Exception as e:
@@ -1726,28 +1742,10 @@ class LightRAG:
1726
  file_path="",
1727
  )
1728
 
1729
- # 2. Get all chunks related to this document
1730
- try:
1731
- all_chunks = await self.text_chunks.get_all()
1732
- related_chunks = {
1733
- chunk_id: chunk_data
1734
- for chunk_id, chunk_data in all_chunks.items()
1735
- if isinstance(chunk_data, dict)
1736
- and chunk_data.get("full_doc_id") == doc_id
1737
- }
1738
-
1739
- # Update pipeline status after getting chunks count
1740
- async with pipeline_status_lock:
1741
- log_message = f"Retrieved {len(related_chunks)} of {len(all_chunks)} related chunks"
1742
- logger.info(log_message)
1743
- pipeline_status["latest_message"] = log_message
1744
- pipeline_status["history_messages"].append(log_message)
1745
-
1746
- except Exception as e:
1747
- logger.error(f"Failed to retrieve chunks for document {doc_id}: {e}")
1748
- raise Exception(f"Failed to retrieve document chunks: {e}") from e
1749
 
1750
- if not related_chunks:
1751
  logger.warning(f"No chunks found for document {doc_id}")
1752
  # Mark that deletion operations have started
1753
  deletion_operations_started = True
@@ -1778,7 +1776,6 @@ class LightRAG:
1778
  file_path=file_path,
1779
  )
1780
 
1781
- chunk_ids = set(related_chunks.keys())
1782
  # Mark that deletion operations have started
1783
  deletion_operations_started = True
1784
 
@@ -1802,26 +1799,12 @@ class LightRAG:
1802
  )
1803
  )
1804
 
1805
- # Update pipeline status after getting affected_nodes
1806
- async with pipeline_status_lock:
1807
- log_message = f"Found {len(affected_nodes)} affected entities"
1808
- logger.info(log_message)
1809
- pipeline_status["latest_message"] = log_message
1810
- pipeline_status["history_messages"].append(log_message)
1811
-
1812
  affected_edges = (
1813
  await self.chunk_entity_relation_graph.get_edges_by_chunk_ids(
1814
  list(chunk_ids)
1815
  )
1816
  )
1817
 
1818
- # Update pipeline status after getting affected_edges
1819
- async with pipeline_status_lock:
1820
- log_message = f"Found {len(affected_edges)} affected relations"
1821
- logger.info(log_message)
1822
- pipeline_status["latest_message"] = log_message
1823
- pipeline_status["history_messages"].append(log_message)
1824
-
1825
  except Exception as e:
1826
  logger.error(f"Failed to analyze affected graph elements: {e}")
1827
  raise Exception(f"Failed to analyze graph dependencies: {e}") from e
@@ -1839,6 +1822,14 @@ class LightRAG:
1839
  elif remaining_sources != sources:
1840
  entities_to_rebuild[node_label] = remaining_sources
1841
 
 
 
 
 
 
 
 
 
1842
  # Process relationships
1843
  for edge_data in affected_edges:
1844
  src = edge_data.get("source")
@@ -1860,6 +1851,14 @@ class LightRAG:
1860
  elif remaining_sources != sources:
1861
  relationships_to_rebuild[edge_tuple] = remaining_sources
1862
 
 
 
 
 
 
 
 
 
1863
  except Exception as e:
1864
  logger.error(f"Failed to process graph analysis results: {e}")
1865
  raise Exception(f"Failed to process graph dependencies: {e}") from e
@@ -1943,7 +1942,7 @@ class LightRAG:
1943
  knowledge_graph_inst=self.chunk_entity_relation_graph,
1944
  entities_vdb=self.entities_vdb,
1945
  relationships_vdb=self.relationships_vdb,
1946
- text_chunks=self.text_chunks,
1947
  llm_response_cache=self.llm_response_cache,
1948
  global_config=asdict(self),
1949
  pipeline_status=pipeline_status,
 
349
 
350
  # Fix global_config now
351
  global_config = asdict(self)
352
+
353
  _print_config = ",\n ".join([f"{k} = {v}" for k, v in global_config.items()])
354
  logger.debug(f"LightRAG init with param:\n {_print_config}\n")
355
 
 
953
  **dp,
954
  "full_doc_id": doc_id,
955
  "file_path": file_path, # Add file path to each chunk
956
+ "llm_cache_list": [], # Initialize empty LLM cache list for each chunk
957
  }
958
  for dp in self.chunking_func(
959
  self.tokenizer,
 
965
  )
966
  }
967
 
968
+ # Process document in two stages
969
+ # Stage 1: Process text chunks and docs (parallel execution)
970
  doc_status_task = asyncio.create_task(
971
  self.doc_status.upsert(
972
  {
973
  doc_id: {
974
  "status": DocStatus.PROCESSING,
975
  "chunks_count": len(chunks),
976
+ "chunks_list": list(
977
+ chunks.keys()
978
+ ), # Save chunks list
979
  "content": status_doc.content,
980
  "content_summary": status_doc.content_summary,
981
  "content_length": status_doc.content_length,
 
991
  chunks_vdb_task = asyncio.create_task(
992
  self.chunks_vdb.upsert(chunks)
993
  )
 
 
 
 
 
994
  full_docs_task = asyncio.create_task(
995
  self.full_docs.upsert(
996
  {doc_id: {"content": status_doc.content}}
 
999
  text_chunks_task = asyncio.create_task(
1000
  self.text_chunks.upsert(chunks)
1001
  )
1002
+
1003
+ # First stage tasks (parallel execution)
1004
+ first_stage_tasks = [
1005
  doc_status_task,
1006
  chunks_vdb_task,
 
1007
  full_docs_task,
1008
  text_chunks_task,
1009
  ]
1010
+ entity_relation_task = None
1011
+
1012
+ # Execute first stage tasks
1013
+ await asyncio.gather(*first_stage_tasks)
1014
+
1015
+ # Stage 2: Process entity relation graph (after text_chunks are saved)
1016
+ entity_relation_task = asyncio.create_task(
1017
+ self._process_entity_relation_graph(
1018
+ chunks, pipeline_status, pipeline_status_lock
1019
+ )
1020
+ )
1021
+ await entity_relation_task
1022
  file_extraction_stage_ok = True
1023
 
1024
  except Exception as e:
 
1033
  )
1034
  pipeline_status["history_messages"].append(error_msg)
1035
 
1036
+ # Cancel tasks that are not yet completed
1037
+ all_tasks = first_stage_tasks + (
1038
+ [entity_relation_task]
1039
+ if entity_relation_task
1040
+ else []
1041
+ )
1042
+ for task in all_tasks:
1043
+ if task and not task.done():
1044
  task.cancel()
1045
 
1046
  # Persistent llm cache
 
1090
  doc_id: {
1091
  "status": DocStatus.PROCESSED,
1092
  "chunks_count": len(chunks),
1093
+ "chunks_list": list(
1094
+ chunks.keys()
1095
+ ), # 保留 chunks_list
1096
  "content": status_doc.content,
1097
  "content_summary": status_doc.content_summary,
1098
  "content_length": status_doc.content_length,
 
1211
  pipeline_status=pipeline_status,
1212
  pipeline_status_lock=pipeline_status_lock,
1213
  llm_response_cache=self.llm_response_cache,
1214
+ text_chunks_storage=self.text_chunks,
1215
  )
1216
  return chunk_results
1217
  except Exception as e:
 
1742
  file_path="",
1743
  )
1744
 
1745
+ # 2. Get chunk IDs from document status
1746
+ chunk_ids = set(doc_status_data.get("chunks_list", []))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1747
 
1748
+ if not chunk_ids:
1749
  logger.warning(f"No chunks found for document {doc_id}")
1750
  # Mark that deletion operations have started
1751
  deletion_operations_started = True
 
1776
  file_path=file_path,
1777
  )
1778
 
 
1779
  # Mark that deletion operations have started
1780
  deletion_operations_started = True
1781
 
 
1799
  )
1800
  )
1801
 
 
 
 
 
 
 
 
1802
  affected_edges = (
1803
  await self.chunk_entity_relation_graph.get_edges_by_chunk_ids(
1804
  list(chunk_ids)
1805
  )
1806
  )
1807
 
 
 
 
 
 
 
 
1808
  except Exception as e:
1809
  logger.error(f"Failed to analyze affected graph elements: {e}")
1810
  raise Exception(f"Failed to analyze graph dependencies: {e}") from e
 
1822
  elif remaining_sources != sources:
1823
  entities_to_rebuild[node_label] = remaining_sources
1824
 
1825
+ async with pipeline_status_lock:
1826
+ log_message = (
1827
+ f"Found {len(entities_to_rebuild)} affected entities"
1828
+ )
1829
+ logger.info(log_message)
1830
+ pipeline_status["latest_message"] = log_message
1831
+ pipeline_status["history_messages"].append(log_message)
1832
+
1833
  # Process relationships
1834
  for edge_data in affected_edges:
1835
  src = edge_data.get("source")
 
1851
  elif remaining_sources != sources:
1852
  relationships_to_rebuild[edge_tuple] = remaining_sources
1853
 
1854
+ async with pipeline_status_lock:
1855
+ log_message = (
1856
+ f"Found {len(relationships_to_rebuild)} affected relations"
1857
+ )
1858
+ logger.info(log_message)
1859
+ pipeline_status["latest_message"] = log_message
1860
+ pipeline_status["history_messages"].append(log_message)
1861
+
1862
  except Exception as e:
1863
  logger.error(f"Failed to process graph analysis results: {e}")
1864
  raise Exception(f"Failed to process graph dependencies: {e}") from e
 
1942
  knowledge_graph_inst=self.chunk_entity_relation_graph,
1943
  entities_vdb=self.entities_vdb,
1944
  relationships_vdb=self.relationships_vdb,
1945
+ text_chunks_storage=self.text_chunks,
1946
  llm_response_cache=self.llm_response_cache,
1947
  global_config=asdict(self),
1948
  pipeline_status=pipeline_status,
lightrag/operate.py CHANGED
@@ -25,6 +25,7 @@ from .utils import (
25
  CacheData,
26
  get_conversation_turns,
27
  use_llm_func_with_cache,
 
28
  )
29
  from .base import (
30
  BaseGraphStorage,
@@ -103,8 +104,6 @@ async def _handle_entity_relation_summary(
103
  entity_or_relation_name: str,
104
  description: str,
105
  global_config: dict,
106
- pipeline_status: dict = None,
107
- pipeline_status_lock=None,
108
  llm_response_cache: BaseKVStorage | None = None,
109
  ) -> str:
110
  """Handle entity relation summary
@@ -247,7 +246,7 @@ async def _rebuild_knowledge_from_chunks(
247
  knowledge_graph_inst: BaseGraphStorage,
248
  entities_vdb: BaseVectorStorage,
249
  relationships_vdb: BaseVectorStorage,
250
- text_chunks: BaseKVStorage,
251
  llm_response_cache: BaseKVStorage,
252
  global_config: dict[str, str],
253
  pipeline_status: dict | None = None,
@@ -261,6 +260,7 @@ async def _rebuild_knowledge_from_chunks(
261
  Args:
262
  entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids
263
  relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids
 
264
  """
265
  if not entities_to_rebuild and not relationships_to_rebuild:
266
  return
@@ -281,9 +281,12 @@ async def _rebuild_knowledge_from_chunks(
281
  pipeline_status["latest_message"] = status_message
282
  pipeline_status["history_messages"].append(status_message)
283
 
284
- # Get cached extraction results for these chunks
 
285
  cached_results = await _get_cached_extraction_results(
286
- llm_response_cache, all_referenced_chunk_ids
 
 
287
  )
288
 
289
  if not cached_results:
@@ -299,15 +302,37 @@ async def _rebuild_knowledge_from_chunks(
299
  chunk_entities = {} # chunk_id -> {entity_name: [entity_data]}
300
  chunk_relationships = {} # chunk_id -> {(src, tgt): [relationship_data]}
301
 
302
- for chunk_id, extraction_result in cached_results.items():
303
  try:
304
- entities, relationships = await _parse_extraction_result(
305
- text_chunks=text_chunks,
306
- extraction_result=extraction_result,
307
- chunk_id=chunk_id,
308
- )
309
- chunk_entities[chunk_id] = entities
310
- chunk_relationships[chunk_id] = relationships
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
311
  except Exception as e:
312
  status_message = (
313
  f"Failed to parse cached extraction result for chunk {chunk_id}: {e}"
@@ -387,43 +412,86 @@ async def _rebuild_knowledge_from_chunks(
387
 
388
 
389
  async def _get_cached_extraction_results(
390
- llm_response_cache: BaseKVStorage, chunk_ids: set[str]
391
- ) -> dict[str, str]:
 
 
392
  """Get cached extraction results for specific chunk IDs
393
 
394
  Args:
 
395
  chunk_ids: Set of chunk IDs to get cached results for
 
 
396
 
397
  Returns:
398
- Dict mapping chunk_id -> extraction_result_text
399
  """
400
  cached_results = {}
401
 
402
- # Get all cached data (flattened cache structure)
403
- all_cache = await llm_response_cache.get_all()
404
 
405
- for cache_key, cache_entry in all_cache.items():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
406
  if (
407
- isinstance(cache_entry, dict)
 
408
  and cache_entry.get("cache_type") == "extract"
409
  and cache_entry.get("chunk_id") in chunk_ids
410
  ):
411
  chunk_id = cache_entry["chunk_id"]
412
  extraction_result = cache_entry["return"]
413
- cached_results[chunk_id] = extraction_result
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
414
 
415
- logger.debug(
416
- f"Found {len(cached_results)} cached extraction results for {len(chunk_ids)} chunk IDs"
417
  )
418
  return cached_results
419
 
420
 
421
  async def _parse_extraction_result(
422
- text_chunks: BaseKVStorage, extraction_result: str, chunk_id: str
423
  ) -> tuple[dict, dict]:
424
  """Parse cached extraction result using the same logic as extract_entities
425
 
426
  Args:
 
427
  extraction_result: The cached LLM extraction result
428
  chunk_id: The chunk ID for source tracking
429
 
@@ -431,8 +499,8 @@ async def _parse_extraction_result(
431
  Tuple of (entities_dict, relationships_dict)
432
  """
433
 
434
- # Get chunk data for file_path
435
- chunk_data = await text_chunks.get_by_id(chunk_id)
436
  file_path = (
437
  chunk_data.get("file_path", "unknown_source")
438
  if chunk_data
@@ -805,8 +873,6 @@ async def _merge_nodes_then_upsert(
805
  entity_name,
806
  description,
807
  global_config,
808
- pipeline_status,
809
- pipeline_status_lock,
810
  llm_response_cache,
811
  )
812
  else:
@@ -969,8 +1035,6 @@ async def _merge_edges_then_upsert(
969
  f"({src_id}, {tgt_id})",
970
  description,
971
  global_config,
972
- pipeline_status,
973
- pipeline_status_lock,
974
  llm_response_cache,
975
  )
976
  else:
@@ -1146,6 +1210,7 @@ async def extract_entities(
1146
  pipeline_status: dict = None,
1147
  pipeline_status_lock=None,
1148
  llm_response_cache: BaseKVStorage | None = None,
 
1149
  ) -> list:
1150
  use_llm_func: callable = global_config["llm_model_func"]
1151
  entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
@@ -1252,6 +1317,9 @@ async def extract_entities(
1252
  # Get file path from chunk data or use default
1253
  file_path = chunk_dp.get("file_path", "unknown_source")
1254
 
 
 
 
1255
  # Get initial extraction
1256
  hint_prompt = entity_extract_prompt.format(
1257
  **{**context_base, "input_text": content}
@@ -1263,7 +1331,10 @@ async def extract_entities(
1263
  llm_response_cache=llm_response_cache,
1264
  cache_type="extract",
1265
  chunk_id=chunk_key,
 
1266
  )
 
 
1267
  history = pack_user_ass_to_openai_messages(hint_prompt, final_result)
1268
 
1269
  # Process initial extraction with file path
@@ -1280,6 +1351,7 @@ async def extract_entities(
1280
  history_messages=history,
1281
  cache_type="extract",
1282
  chunk_id=chunk_key,
 
1283
  )
1284
 
1285
  history += pack_user_ass_to_openai_messages(continue_prompt, glean_result)
@@ -1310,11 +1382,21 @@ async def extract_entities(
1310
  llm_response_cache=llm_response_cache,
1311
  history_messages=history,
1312
  cache_type="extract",
 
1313
  )
1314
  if_loop_result = if_loop_result.strip().strip('"').strip("'").lower()
1315
  if if_loop_result != "yes":
1316
  break
1317
 
 
 
 
 
 
 
 
 
 
1318
  processed_chunks += 1
1319
  entities_count = len(maybe_nodes)
1320
  relations_count = len(maybe_edges)
 
25
  CacheData,
26
  get_conversation_turns,
27
  use_llm_func_with_cache,
28
+ update_chunk_cache_list,
29
  )
30
  from .base import (
31
  BaseGraphStorage,
 
104
  entity_or_relation_name: str,
105
  description: str,
106
  global_config: dict,
 
 
107
  llm_response_cache: BaseKVStorage | None = None,
108
  ) -> str:
109
  """Handle entity relation summary
 
246
  knowledge_graph_inst: BaseGraphStorage,
247
  entities_vdb: BaseVectorStorage,
248
  relationships_vdb: BaseVectorStorage,
249
+ text_chunks_storage: BaseKVStorage,
250
  llm_response_cache: BaseKVStorage,
251
  global_config: dict[str, str],
252
  pipeline_status: dict | None = None,
 
260
  Args:
261
  entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids
262
  relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids
263
+ text_chunks_data: Pre-loaded chunk data dict {chunk_id: chunk_data}
264
  """
265
  if not entities_to_rebuild and not relationships_to_rebuild:
266
  return
 
281
  pipeline_status["latest_message"] = status_message
282
  pipeline_status["history_messages"].append(status_message)
283
 
284
+ # Get cached extraction results for these chunks using storage
285
+ # cached_results: chunk_id -> [list of extraction result from LLM cache sorted by created_at]
286
  cached_results = await _get_cached_extraction_results(
287
+ llm_response_cache,
288
+ all_referenced_chunk_ids,
289
+ text_chunks_storage=text_chunks_storage,
290
  )
291
 
292
  if not cached_results:
 
302
  chunk_entities = {} # chunk_id -> {entity_name: [entity_data]}
303
  chunk_relationships = {} # chunk_id -> {(src, tgt): [relationship_data]}
304
 
305
+ for chunk_id, extraction_results in cached_results.items():
306
  try:
307
+ # Handle multiple extraction results per chunk
308
+ chunk_entities[chunk_id] = defaultdict(list)
309
+ chunk_relationships[chunk_id] = defaultdict(list)
310
+
311
+ # process multiple LLM extraction results for a single chunk_id
312
+ for extraction_result in extraction_results:
313
+ entities, relationships = await _parse_extraction_result(
314
+ text_chunks_storage=text_chunks_storage,
315
+ extraction_result=extraction_result,
316
+ chunk_id=chunk_id,
317
+ )
318
+
319
+ # Merge entities and relationships from this extraction result
320
+ # Only keep the first occurrence of each entity_name in the same chunk_id
321
+ for entity_name, entity_list in entities.items():
322
+ if (
323
+ entity_name not in chunk_entities[chunk_id]
324
+ or len(chunk_entities[chunk_id][entity_name]) == 0
325
+ ):
326
+ chunk_entities[chunk_id][entity_name].extend(entity_list)
327
+
328
+ # Only keep the first occurrence of each rel_key in the same chunk_id
329
+ for rel_key, rel_list in relationships.items():
330
+ if (
331
+ rel_key not in chunk_relationships[chunk_id]
332
+ or len(chunk_relationships[chunk_id][rel_key]) == 0
333
+ ):
334
+ chunk_relationships[chunk_id][rel_key].extend(rel_list)
335
+
336
  except Exception as e:
337
  status_message = (
338
  f"Failed to parse cached extraction result for chunk {chunk_id}: {e}"
 
412
 
413
 
414
  async def _get_cached_extraction_results(
415
+ llm_response_cache: BaseKVStorage,
416
+ chunk_ids: set[str],
417
+ text_chunks_storage: BaseKVStorage,
418
+ ) -> dict[str, list[str]]:
419
  """Get cached extraction results for specific chunk IDs
420
 
421
  Args:
422
+ llm_response_cache: LLM response cache storage
423
  chunk_ids: Set of chunk IDs to get cached results for
424
+ text_chunks_data: Pre-loaded chunk data (optional, for performance)
425
+ text_chunks_storage: Text chunks storage (fallback if text_chunks_data is None)
426
 
427
  Returns:
428
+ Dict mapping chunk_id -> list of extraction_result_text
429
  """
430
  cached_results = {}
431
 
432
+ # Collect all LLM cache IDs from chunks
433
+ all_cache_ids = set()
434
 
435
+ # Read from storage
436
+ chunk_data_list = await text_chunks_storage.get_by_ids(list(chunk_ids))
437
+ for chunk_id, chunk_data in zip(chunk_ids, chunk_data_list):
438
+ if chunk_data and isinstance(chunk_data, dict):
439
+ llm_cache_list = chunk_data.get("llm_cache_list", [])
440
+ if llm_cache_list:
441
+ all_cache_ids.update(llm_cache_list)
442
+ else:
443
+ logger.warning(
444
+ f"Chunk {chunk_id} data is invalid or None: {type(chunk_data)}"
445
+ )
446
+
447
+ if not all_cache_ids:
448
+ logger.warning(f"No LLM cache IDs found for {len(chunk_ids)} chunk IDs")
449
+ return cached_results
450
+
451
+ # Batch get LLM cache entries
452
+ cache_data_list = await llm_response_cache.get_by_ids(list(all_cache_ids))
453
+
454
+ # Process cache entries and group by chunk_id
455
+ valid_entries = 0
456
+ for cache_id, cache_entry in zip(all_cache_ids, cache_data_list):
457
  if (
458
+ cache_entry is not None
459
+ and isinstance(cache_entry, dict)
460
  and cache_entry.get("cache_type") == "extract"
461
  and cache_entry.get("chunk_id") in chunk_ids
462
  ):
463
  chunk_id = cache_entry["chunk_id"]
464
  extraction_result = cache_entry["return"]
465
+ create_time = cache_entry.get(
466
+ "create_time", 0
467
+ ) # Get creation time, default to 0
468
+ valid_entries += 1
469
+
470
+ # Support multiple LLM caches per chunk
471
+ if chunk_id not in cached_results:
472
+ cached_results[chunk_id] = []
473
+ # Store tuple with extraction result and creation time for sorting
474
+ cached_results[chunk_id].append((extraction_result, create_time))
475
+
476
+ # Sort extraction results by create_time for each chunk
477
+ for chunk_id in cached_results:
478
+ # Sort by create_time (x[1]), then extract only extraction_result (x[0])
479
+ cached_results[chunk_id].sort(key=lambda x: x[1])
480
+ cached_results[chunk_id] = [item[0] for item in cached_results[chunk_id]]
481
 
482
+ logger.info(
483
+ f"Found {valid_entries} valid cache entries, {len(cached_results)} chunks with results"
484
  )
485
  return cached_results
486
 
487
 
488
  async def _parse_extraction_result(
489
+ text_chunks_storage: BaseKVStorage, extraction_result: str, chunk_id: str
490
  ) -> tuple[dict, dict]:
491
  """Parse cached extraction result using the same logic as extract_entities
492
 
493
  Args:
494
+ text_chunks_storage: Text chunks storage to get chunk data
495
  extraction_result: The cached LLM extraction result
496
  chunk_id: The chunk ID for source tracking
497
 
 
499
  Tuple of (entities_dict, relationships_dict)
500
  """
501
 
502
+ # Get chunk data for file_path from storage
503
+ chunk_data = await text_chunks_storage.get_by_id(chunk_id)
504
  file_path = (
505
  chunk_data.get("file_path", "unknown_source")
506
  if chunk_data
 
873
  entity_name,
874
  description,
875
  global_config,
 
 
876
  llm_response_cache,
877
  )
878
  else:
 
1035
  f"({src_id}, {tgt_id})",
1036
  description,
1037
  global_config,
 
 
1038
  llm_response_cache,
1039
  )
1040
  else:
 
1210
  pipeline_status: dict = None,
1211
  pipeline_status_lock=None,
1212
  llm_response_cache: BaseKVStorage | None = None,
1213
+ text_chunks_storage: BaseKVStorage | None = None,
1214
  ) -> list:
1215
  use_llm_func: callable = global_config["llm_model_func"]
1216
  entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
 
1317
  # Get file path from chunk data or use default
1318
  file_path = chunk_dp.get("file_path", "unknown_source")
1319
 
1320
+ # Create cache keys collector for batch processing
1321
+ cache_keys_collector = []
1322
+
1323
  # Get initial extraction
1324
  hint_prompt = entity_extract_prompt.format(
1325
  **{**context_base, "input_text": content}
 
1331
  llm_response_cache=llm_response_cache,
1332
  cache_type="extract",
1333
  chunk_id=chunk_key,
1334
+ cache_keys_collector=cache_keys_collector,
1335
  )
1336
+
1337
+ # Store LLM cache reference in chunk (will be handled by use_llm_func_with_cache)
1338
  history = pack_user_ass_to_openai_messages(hint_prompt, final_result)
1339
 
1340
  # Process initial extraction with file path
 
1351
  history_messages=history,
1352
  cache_type="extract",
1353
  chunk_id=chunk_key,
1354
+ cache_keys_collector=cache_keys_collector,
1355
  )
1356
 
1357
  history += pack_user_ass_to_openai_messages(continue_prompt, glean_result)
 
1382
  llm_response_cache=llm_response_cache,
1383
  history_messages=history,
1384
  cache_type="extract",
1385
+ cache_keys_collector=cache_keys_collector,
1386
  )
1387
  if_loop_result = if_loop_result.strip().strip('"').strip("'").lower()
1388
  if if_loop_result != "yes":
1389
  break
1390
 
1391
+ # Batch update chunk's llm_cache_list with all collected cache keys
1392
+ if cache_keys_collector and text_chunks_storage:
1393
+ await update_chunk_cache_list(
1394
+ chunk_key,
1395
+ text_chunks_storage,
1396
+ cache_keys_collector,
1397
+ "entity_extraction",
1398
+ )
1399
+
1400
  processed_chunks += 1
1401
  entities_count = len(maybe_nodes)
1402
  relations_count = len(maybe_edges)
lightrag/utils.py CHANGED
@@ -1423,6 +1423,48 @@ def lazy_external_import(module_name: str, class_name: str) -> Callable[..., Any
1423
  return import_class
1424
 
1425
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1426
  async def use_llm_func_with_cache(
1427
  input_text: str,
1428
  use_llm_func: callable,
@@ -1431,6 +1473,7 @@ async def use_llm_func_with_cache(
1431
  history_messages: list[dict[str, str]] = None,
1432
  cache_type: str = "extract",
1433
  chunk_id: str | None = None,
 
1434
  ) -> str:
1435
  """Call LLM function with cache support
1436
 
@@ -1445,6 +1488,8 @@ async def use_llm_func_with_cache(
1445
  history_messages: History messages list
1446
  cache_type: Type of cache
1447
  chunk_id: Chunk identifier to store in cache
 
 
1448
 
1449
  Returns:
1450
  LLM response text
@@ -1457,6 +1502,9 @@ async def use_llm_func_with_cache(
1457
  _prompt = input_text
1458
 
1459
  arg_hash = compute_args_hash(_prompt)
 
 
 
1460
  cached_return, _1, _2, _3 = await handle_cache(
1461
  llm_response_cache,
1462
  arg_hash,
@@ -1467,6 +1515,11 @@ async def use_llm_func_with_cache(
1467
  if cached_return:
1468
  logger.debug(f"Found cache for {arg_hash}")
1469
  statistic_data["llm_cache"] += 1
 
 
 
 
 
1470
  return cached_return
1471
  statistic_data["llm_call"] += 1
1472
 
@@ -1491,6 +1544,10 @@ async def use_llm_func_with_cache(
1491
  ),
1492
  )
1493
 
 
 
 
 
1494
  return res
1495
 
1496
  # When cache is disabled, directly call LLM
 
1423
  return import_class
1424
 
1425
 
1426
+ async def update_chunk_cache_list(
1427
+ chunk_id: str,
1428
+ text_chunks_storage: "BaseKVStorage",
1429
+ cache_keys: list[str],
1430
+ cache_scenario: str = "batch_update",
1431
+ ) -> None:
1432
+ """Update chunk's llm_cache_list with the given cache keys
1433
+
1434
+ Args:
1435
+ chunk_id: Chunk identifier
1436
+ text_chunks_storage: Text chunks storage instance
1437
+ cache_keys: List of cache keys to add to the list
1438
+ cache_scenario: Description of the cache scenario for logging
1439
+ """
1440
+ if not cache_keys:
1441
+ return
1442
+
1443
+ try:
1444
+ chunk_data = await text_chunks_storage.get_by_id(chunk_id)
1445
+ if chunk_data:
1446
+ # Ensure llm_cache_list exists
1447
+ if "llm_cache_list" not in chunk_data:
1448
+ chunk_data["llm_cache_list"] = []
1449
+
1450
+ # Add cache keys to the list if not already present
1451
+ existing_keys = set(chunk_data["llm_cache_list"])
1452
+ new_keys = [key for key in cache_keys if key not in existing_keys]
1453
+
1454
+ if new_keys:
1455
+ chunk_data["llm_cache_list"].extend(new_keys)
1456
+
1457
+ # Update the chunk in storage
1458
+ await text_chunks_storage.upsert({chunk_id: chunk_data})
1459
+ logger.debug(
1460
+ f"Updated chunk {chunk_id} with {len(new_keys)} cache keys ({cache_scenario})"
1461
+ )
1462
+ except Exception as e:
1463
+ logger.warning(
1464
+ f"Failed to update chunk {chunk_id} with cache references on {cache_scenario}: {e}"
1465
+ )
1466
+
1467
+
1468
  async def use_llm_func_with_cache(
1469
  input_text: str,
1470
  use_llm_func: callable,
 
1473
  history_messages: list[dict[str, str]] = None,
1474
  cache_type: str = "extract",
1475
  chunk_id: str | None = None,
1476
+ cache_keys_collector: list = None,
1477
  ) -> str:
1478
  """Call LLM function with cache support
1479
 
 
1488
  history_messages: History messages list
1489
  cache_type: Type of cache
1490
  chunk_id: Chunk identifier to store in cache
1491
+ text_chunks_storage: Text chunks storage to update llm_cache_list
1492
+ cache_keys_collector: Optional list to collect cache keys for batch processing
1493
 
1494
  Returns:
1495
  LLM response text
 
1502
  _prompt = input_text
1503
 
1504
  arg_hash = compute_args_hash(_prompt)
1505
+ # Generate cache key for this LLM call
1506
+ cache_key = generate_cache_key("default", cache_type, arg_hash)
1507
+
1508
  cached_return, _1, _2, _3 = await handle_cache(
1509
  llm_response_cache,
1510
  arg_hash,
 
1515
  if cached_return:
1516
  logger.debug(f"Found cache for {arg_hash}")
1517
  statistic_data["llm_cache"] += 1
1518
+
1519
+ # Add cache key to collector if provided
1520
+ if cache_keys_collector is not None:
1521
+ cache_keys_collector.append(cache_key)
1522
+
1523
  return cached_return
1524
  statistic_data["llm_call"] += 1
1525
 
 
1544
  ),
1545
  )
1546
 
1547
+ # Add cache key to collector if provided
1548
+ if cache_keys_collector is not None:
1549
+ cache_keys_collector.append(cache_key)
1550
+
1551
  return res
1552
 
1553
  # When cache is disabled, directly call LLM