gzdaniel commited on
Commit
c298a71
·
1 Parent(s): a8003f1

Implemented storage types: PostgreSQL and MongoDB

Browse files
env.example CHANGED
@@ -114,15 +114,6 @@ EMBEDDING_BINDING_HOST=http://localhost:11434
114
  # LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage
115
  # LIGHTRAG_GRAPH_STORAGE=Neo4JStorage
116
 
117
- ### TiDB Configuration (Deprecated)
118
- # TIDB_HOST=localhost
119
- # TIDB_PORT=4000
120
- # TIDB_USER=your_username
121
- # TIDB_PASSWORD='your_password'
122
- # TIDB_DATABASE=your_database
123
- ### separating all data from difference Lightrag instances(deprecating)
124
- # TIDB_WORKSPACE=default
125
-
126
  ### PostgreSQL Configuration
127
  POSTGRES_HOST=localhost
128
  POSTGRES_PORT=5432
@@ -130,7 +121,7 @@ POSTGRES_USER=your_username
130
  POSTGRES_PASSWORD='your_password'
131
  POSTGRES_DATABASE=your_database
132
  POSTGRES_MAX_CONNECTIONS=12
133
- ### separating all data from difference Lightrag instances(deprecating)
134
  # POSTGRES_WORKSPACE=default
135
 
136
  ### Neo4j Configuration
@@ -146,14 +137,15 @@ NEO4J_PASSWORD='your_password'
146
  # AGE_POSTGRES_PORT=8529
147
 
148
  # AGE Graph Name(apply to PostgreSQL and independent AGM)
149
- ### AGE_GRAPH_NAME is precated
150
  # AGE_GRAPH_NAME=lightrag
151
 
152
  ### MongoDB Configuration
153
  MONGO_URI=mongodb://root:root@localhost:27017/
154
  MONGO_DATABASE=LightRAG
155
  ### separating all data from difference Lightrag instances(deprecating)
156
- # MONGODB_GRAPH=false
 
157
 
158
  ### Milvus Configuration
159
  MILVUS_URI=http://localhost:19530
 
114
  # LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage
115
  # LIGHTRAG_GRAPH_STORAGE=Neo4JStorage
116
 
 
 
 
 
 
 
 
 
 
117
  ### PostgreSQL Configuration
118
  POSTGRES_HOST=localhost
119
  POSTGRES_PORT=5432
 
121
  POSTGRES_PASSWORD='your_password'
122
  POSTGRES_DATABASE=your_database
123
  POSTGRES_MAX_CONNECTIONS=12
124
+ ### separating all data from difference Lightrag instances
125
  # POSTGRES_WORKSPACE=default
126
 
127
  ### Neo4j Configuration
 
137
  # AGE_POSTGRES_PORT=8529
138
 
139
  # AGE Graph Name(apply to PostgreSQL and independent AGM)
140
+ ### AGE_GRAPH_NAME is deprecated
141
  # AGE_GRAPH_NAME=lightrag
142
 
143
  ### MongoDB Configuration
144
  MONGO_URI=mongodb://root:root@localhost:27017/
145
  MONGO_DATABASE=LightRAG
146
  ### separating all data from difference Lightrag instances(deprecating)
147
+ ### separating all data from difference Lightrag instances
148
+ # MONGODB_WORKSPACE=default
149
 
150
  ### Milvus Configuration
151
  MILVUS_URI=http://localhost:19530
lightrag/kg/mongo_impl.py CHANGED
@@ -133,6 +133,11 @@ class MongoKVStorage(BaseKVStorage):
133
 
134
  operations = []
135
  for k, v in data.items():
 
 
 
 
 
136
  v["_id"] = k # Use flattened key as _id
137
  operations.append(UpdateOne({"_id": k}, {"$set": v}, upsert=True))
138
 
@@ -247,6 +252,9 @@ class MongoDocStatusStorage(DocStatusStorage):
247
  return
248
  update_tasks: list[Any] = []
249
  for k, v in data.items():
 
 
 
250
  data[k]["_id"] = k
251
  update_tasks.append(
252
  self._data.update_one({"_id": k}, {"$set": v}, upsert=True)
@@ -279,6 +287,7 @@ class MongoDocStatusStorage(DocStatusStorage):
279
  updated_at=doc.get("updated_at"),
280
  chunks_count=doc.get("chunks_count", -1),
281
  file_path=doc.get("file_path", doc["_id"]),
 
282
  )
283
  for doc in result
284
  }
 
133
 
134
  operations = []
135
  for k, v in data.items():
136
+ # For text_chunks namespace, ensure llm_cache_list field exists
137
+ if self.namespace.endswith("text_chunks"):
138
+ if "llm_cache_list" not in v:
139
+ v["llm_cache_list"] = []
140
+
141
  v["_id"] = k # Use flattened key as _id
142
  operations.append(UpdateOne({"_id": k}, {"$set": v}, upsert=True))
143
 
 
252
  return
253
  update_tasks: list[Any] = []
254
  for k, v in data.items():
255
+ # Ensure chunks_list field exists and is an array
256
+ if "chunks_list" not in v:
257
+ v["chunks_list"] = []
258
  data[k]["_id"] = k
259
  update_tasks.append(
260
  self._data.update_one({"_id": k}, {"$set": v}, upsert=True)
 
287
  updated_at=doc.get("updated_at"),
288
  chunks_count=doc.get("chunks_count", -1),
289
  file_path=doc.get("file_path", doc["_id"]),
290
+ chunks_list=doc.get("chunks_list", []),
291
  )
292
  for doc in result
293
  }
lightrag/kg/postgres_impl.py CHANGED
@@ -136,6 +136,48 @@ class PostgreSQLDB:
136
  except Exception as e:
137
  logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}")
138
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
139
  async def _migrate_timestamp_columns(self):
140
  """Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC time"""
141
  # Tables and columns that need migration
@@ -301,15 +343,17 @@ class PostgreSQLDB:
301
  record["mode"], record["original_prompt"]
302
  )
303
 
 
 
 
304
  # Generate new flattened key
305
- cache_type = "extract" # Default type
306
  new_key = f"{record['mode']}:{cache_type}:{new_hash}"
307
 
308
- # Insert new format data
309
  insert_sql = """
310
  INSERT INTO LIGHTRAG_LLM_CACHE
311
- (workspace, id, mode, original_prompt, return_value, chunk_id, create_time, update_time)
312
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
313
  ON CONFLICT (workspace, mode, id) DO NOTHING
314
  """
315
 
@@ -322,6 +366,7 @@ class PostgreSQLDB:
322
  "original_prompt": record["original_prompt"],
323
  "return_value": record["return_value"],
324
  "chunk_id": record["chunk_id"],
 
325
  "create_time": record["create_time"],
326
  "update_time": record["update_time"],
327
  },
@@ -357,6 +402,68 @@ class PostgreSQLDB:
357
  logger.error(f"LLM cache migration failed: {e}")
358
  # Don't raise exception, allow system to continue startup
359
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
360
  async def check_tables(self):
361
  # First create all tables
362
  for k, v in TABLES.items():
@@ -408,6 +515,13 @@ class PostgreSQLDB:
408
  logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}")
409
  # Don't throw an exception, allow the initialization process to continue
410
 
 
 
 
 
 
 
 
411
  # Finally, attempt to migrate old doc chunks data if needed
412
  try:
413
  await self._migrate_doc_chunks_to_vdb_chunks()
@@ -421,6 +535,22 @@ class PostgreSQLDB:
421
  except Exception as e:
422
  logger.error(f"PostgreSQL, LLM cache migration failed: {e}")
423
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
424
  async def query(
425
  self,
426
  sql: str,
@@ -608,17 +738,11 @@ class PGKVStorage(BaseKVStorage):
608
  if is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE):
609
  processed_results = {}
610
  for row in results:
611
- # Parse flattened key to extract cache_type
612
- key_parts = row["id"].split(":")
613
- cache_type = key_parts[1] if len(key_parts) >= 3 else "unknown"
614
-
615
  # Map field names and add cache_type for compatibility
616
  processed_row = {
617
  **row,
618
- "return": row.get(
619
- "return_value", ""
620
- ), # Map return_value to return
621
- "cache_type": cache_type, # Add cache_type from key
622
  "original_prompt": row.get("original_prompt", ""),
623
  "chunk_id": row.get("chunk_id"),
624
  "mode": row.get("mode", "default"),
@@ -626,6 +750,20 @@ class PGKVStorage(BaseKVStorage):
626
  processed_results[row["id"]] = processed_row
627
  return processed_results
628
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
629
  # For other namespaces, return as-is
630
  return {row["id"]: row for row in results}
631
  except Exception as e:
@@ -637,6 +775,29 @@ class PGKVStorage(BaseKVStorage):
637
  sql = SQL_TEMPLATES["get_by_id_" + self.namespace]
638
  params = {"workspace": self.db.workspace, "id": id}
639
  response = await self.db.query(sql, params)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
640
  return response if response else None
641
 
642
  # Query by id
@@ -646,13 +807,36 @@ class PGKVStorage(BaseKVStorage):
646
  ids=",".join([f"'{id}'" for id in ids])
647
  )
648
  params = {"workspace": self.db.workspace}
649
- return await self.db.query(sql, params, multirows=True)
650
 
651
- async def get_by_status(self, status: str) -> Union[list[dict[str, Any]], None]:
652
- """Specifically for llm_response_cache."""
653
- SQL = SQL_TEMPLATES["get_by_status_" + self.namespace]
654
- params = {"workspace": self.db.workspace, "status": status}
655
- return await self.db.query(SQL, params, multirows=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
656
 
657
  async def filter_keys(self, keys: set[str]) -> set[str]:
658
  """Filter out duplicated content"""
@@ -693,6 +877,7 @@ class PGKVStorage(BaseKVStorage):
693
  "full_doc_id": v["full_doc_id"],
694
  "content": v["content"],
695
  "file_path": v["file_path"],
 
696
  "create_time": current_time,
697
  "update_time": current_time,
698
  }
@@ -716,6 +901,7 @@ class PGKVStorage(BaseKVStorage):
716
  "return_value": v["return"],
717
  "mode": v.get("mode", "default"), # Get mode from data
718
  "chunk_id": v.get("chunk_id"),
 
719
  }
720
 
721
  await self.db.execute(upsert_sql, _data)
@@ -1140,6 +1326,14 @@ class PGDocStatusStorage(DocStatusStorage):
1140
  if result is None or result == []:
1141
  return None
1142
  else:
 
 
 
 
 
 
 
 
1143
  return dict(
1144
  content=result[0]["content"],
1145
  content_length=result[0]["content_length"],
@@ -1149,6 +1343,7 @@ class PGDocStatusStorage(DocStatusStorage):
1149
  created_at=result[0]["created_at"],
1150
  updated_at=result[0]["updated_at"],
1151
  file_path=result[0]["file_path"],
 
1152
  )
1153
 
1154
  async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
@@ -1163,19 +1358,32 @@ class PGDocStatusStorage(DocStatusStorage):
1163
 
1164
  if not results:
1165
  return []
1166
- return [
1167
- {
1168
- "content": row["content"],
1169
- "content_length": row["content_length"],
1170
- "content_summary": row["content_summary"],
1171
- "status": row["status"],
1172
- "chunks_count": row["chunks_count"],
1173
- "created_at": row["created_at"],
1174
- "updated_at": row["updated_at"],
1175
- "file_path": row["file_path"],
1176
- }
1177
- for row in results
1178
- ]
 
 
 
 
 
 
 
 
 
 
 
 
 
1179
 
1180
  async def get_status_counts(self) -> dict[str, int]:
1181
  """Get counts of documents in each status"""
@@ -1196,8 +1404,18 @@ class PGDocStatusStorage(DocStatusStorage):
1196
  sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
1197
  params = {"workspace": self.db.workspace, "status": status.value}
1198
  result = await self.db.query(sql, params, True)
1199
- docs_by_status = {
1200
- element["id"]: DocProcessingStatus(
 
 
 
 
 
 
 
 
 
 
1201
  content=element["content"],
1202
  content_summary=element["content_summary"],
1203
  content_length=element["content_length"],
@@ -1206,9 +1424,9 @@ class PGDocStatusStorage(DocStatusStorage):
1206
  updated_at=element["updated_at"],
1207
  chunks_count=element["chunks_count"],
1208
  file_path=element["file_path"],
 
1209
  )
1210
- for element in result
1211
- }
1212
  return docs_by_status
1213
 
1214
  async def index_done_callback(self) -> None:
@@ -1272,10 +1490,10 @@ class PGDocStatusStorage(DocStatusStorage):
1272
  logger.warning(f"Unable to parse datetime string: {dt_str}")
1273
  return None
1274
 
1275
- # Modified SQL to include created_at and updated_at in both INSERT and UPDATE operations
1276
- # Both fields are updated from the input data in both INSERT and UPDATE cases
1277
- sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path,created_at,updated_at)
1278
- values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
1279
  on conflict(id,workspace) do update set
1280
  content = EXCLUDED.content,
1281
  content_summary = EXCLUDED.content_summary,
@@ -1283,6 +1501,7 @@ class PGDocStatusStorage(DocStatusStorage):
1283
  chunks_count = EXCLUDED.chunks_count,
1284
  status = EXCLUDED.status,
1285
  file_path = EXCLUDED.file_path,
 
1286
  created_at = EXCLUDED.created_at,
1287
  updated_at = EXCLUDED.updated_at"""
1288
  for k, v in data.items():
@@ -1290,7 +1509,7 @@ class PGDocStatusStorage(DocStatusStorage):
1290
  created_at = parse_datetime(v.get("created_at"))
1291
  updated_at = parse_datetime(v.get("updated_at"))
1292
 
1293
- # chunks_count is optional
1294
  await self.db.execute(
1295
  sql,
1296
  {
@@ -1302,6 +1521,7 @@ class PGDocStatusStorage(DocStatusStorage):
1302
  "chunks_count": v["chunks_count"] if "chunks_count" in v else -1,
1303
  "status": v["status"],
1304
  "file_path": v["file_path"],
 
1305
  "created_at": created_at, # Use the converted datetime object
1306
  "updated_at": updated_at, # Use the converted datetime object
1307
  },
@@ -2620,6 +2840,7 @@ TABLES = {
2620
  tokens INTEGER,
2621
  content TEXT,
2622
  file_path VARCHAR(256),
 
2623
  create_time TIMESTAMP(0) WITH TIME ZONE,
2624
  update_time TIMESTAMP(0) WITH TIME ZONE,
2625
  CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
@@ -2692,6 +2913,7 @@ TABLES = {
2692
  chunks_count int4 NULL,
2693
  status varchar(64) NULL,
2694
  file_path TEXT NULL,
 
2695
  created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL,
2696
  updated_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL,
2697
  CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id)
@@ -2706,24 +2928,26 @@ SQL_TEMPLATES = {
2706
  FROM LIGHTRAG_DOC_FULL WHERE workspace=$1 AND id=$2
2707
  """,
2708
  "get_by_id_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
2709
- chunk_order_index, full_doc_id, file_path
 
2710
  FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2
2711
  """,
2712
- "get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
2713
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id=$2
2714
  """,
2715
- "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
2716
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 AND id=$3
2717
  """,
2718
  "get_by_ids_full_docs": """SELECT id, COALESCE(content, '') as content
2719
  FROM LIGHTRAG_DOC_FULL WHERE workspace=$1 AND id IN ({ids})
2720
  """,
2721
  "get_by_ids_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
2722
- chunk_order_index, full_doc_id, file_path
 
2723
  FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN ({ids})
2724
  """,
2725
- "get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
2726
- FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode= IN ({ids})
2727
  """,
2728
  "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",
2729
  "upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, workspace)
@@ -2731,25 +2955,27 @@ SQL_TEMPLATES = {
2731
  ON CONFLICT (workspace,id) DO UPDATE
2732
  SET content = $2, update_time = CURRENT_TIMESTAMP
2733
  """,
2734
- "upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode,chunk_id)
2735
- VALUES ($1, $2, $3, $4, $5, $6)
2736
  ON CONFLICT (workspace,mode,id) DO UPDATE
2737
  SET original_prompt = EXCLUDED.original_prompt,
2738
  return_value=EXCLUDED.return_value,
2739
  mode=EXCLUDED.mode,
2740
  chunk_id=EXCLUDED.chunk_id,
 
2741
  update_time = CURRENT_TIMESTAMP
2742
  """,
2743
  "upsert_text_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
2744
- chunk_order_index, full_doc_id, content, file_path,
2745
  create_time, update_time)
2746
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
2747
  ON CONFLICT (workspace,id) DO UPDATE
2748
  SET tokens=EXCLUDED.tokens,
2749
  chunk_order_index=EXCLUDED.chunk_order_index,
2750
  full_doc_id=EXCLUDED.full_doc_id,
2751
  content = EXCLUDED.content,
2752
  file_path=EXCLUDED.file_path,
 
2753
  update_time = EXCLUDED.update_time
2754
  """,
2755
  # SQL for VectorStorage
 
136
  except Exception as e:
137
  logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}")
138
 
139
+ async def _migrate_llm_cache_add_cache_type(self):
140
+ """Add cache_type column to LIGHTRAG_LLM_CACHE table if it doesn't exist"""
141
+ try:
142
+ # Check if cache_type column exists
143
+ check_column_sql = """
144
+ SELECT column_name
145
+ FROM information_schema.columns
146
+ WHERE table_name = 'lightrag_llm_cache'
147
+ AND column_name = 'cache_type'
148
+ """
149
+
150
+ column_info = await self.query(check_column_sql)
151
+ if not column_info:
152
+ logger.info("Adding cache_type column to LIGHTRAG_LLM_CACHE table")
153
+ add_column_sql = """
154
+ ALTER TABLE LIGHTRAG_LLM_CACHE
155
+ ADD COLUMN cache_type VARCHAR(32) NULL
156
+ """
157
+ await self.execute(add_column_sql)
158
+ logger.info(
159
+ "Successfully added cache_type column to LIGHTRAG_LLM_CACHE table"
160
+ )
161
+
162
+ # Migrate existing data: extract cache_type from flattened keys
163
+ logger.info("Migrating existing LLM cache data to populate cache_type field")
164
+ update_sql = """
165
+ UPDATE LIGHTRAG_LLM_CACHE
166
+ SET cache_type = CASE
167
+ WHEN id LIKE '%:%:%' THEN split_part(id, ':', 2)
168
+ ELSE 'extract'
169
+ END
170
+ WHERE cache_type IS NULL
171
+ """
172
+ await self.execute(update_sql)
173
+ logger.info("Successfully migrated existing LLM cache data")
174
+ else:
175
+ logger.info(
176
+ "cache_type column already exists in LIGHTRAG_LLM_CACHE table"
177
+ )
178
+ except Exception as e:
179
+ logger.warning(f"Failed to add cache_type column to LIGHTRAG_LLM_CACHE: {e}")
180
+
181
  async def _migrate_timestamp_columns(self):
182
  """Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC time"""
183
  # Tables and columns that need migration
 
343
  record["mode"], record["original_prompt"]
344
  )
345
 
346
+ # Determine cache_type based on mode
347
+ cache_type = "extract" if record["mode"] == "default" else "unknown"
348
+
349
  # Generate new flattened key
 
350
  new_key = f"{record['mode']}:{cache_type}:{new_hash}"
351
 
352
+ # Insert new format data with cache_type field
353
  insert_sql = """
354
  INSERT INTO LIGHTRAG_LLM_CACHE
355
+ (workspace, id, mode, original_prompt, return_value, chunk_id, cache_type, create_time, update_time)
356
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
357
  ON CONFLICT (workspace, mode, id) DO NOTHING
358
  """
359
 
 
366
  "original_prompt": record["original_prompt"],
367
  "return_value": record["return_value"],
368
  "chunk_id": record["chunk_id"],
369
+ "cache_type": cache_type, # Add cache_type field
370
  "create_time": record["create_time"],
371
  "update_time": record["update_time"],
372
  },
 
402
  logger.error(f"LLM cache migration failed: {e}")
403
  # Don't raise exception, allow system to continue startup
404
 
405
+ async def _migrate_doc_status_add_chunks_list(self):
406
+ """Add chunks_list column to LIGHTRAG_DOC_STATUS table if it doesn't exist"""
407
+ try:
408
+ # Check if chunks_list column exists
409
+ check_column_sql = """
410
+ SELECT column_name
411
+ FROM information_schema.columns
412
+ WHERE table_name = 'lightrag_doc_status'
413
+ AND column_name = 'chunks_list'
414
+ """
415
+
416
+ column_info = await self.query(check_column_sql)
417
+ if not column_info:
418
+ logger.info("Adding chunks_list column to LIGHTRAG_DOC_STATUS table")
419
+ add_column_sql = """
420
+ ALTER TABLE LIGHTRAG_DOC_STATUS
421
+ ADD COLUMN chunks_list JSONB NULL DEFAULT '[]'::jsonb
422
+ """
423
+ await self.execute(add_column_sql)
424
+ logger.info(
425
+ "Successfully added chunks_list column to LIGHTRAG_DOC_STATUS table"
426
+ )
427
+ else:
428
+ logger.info(
429
+ "chunks_list column already exists in LIGHTRAG_DOC_STATUS table"
430
+ )
431
+ except Exception as e:
432
+ logger.warning(
433
+ f"Failed to add chunks_list column to LIGHTRAG_DOC_STATUS: {e}"
434
+ )
435
+
436
+ async def _migrate_text_chunks_add_llm_cache_list(self):
437
+ """Add llm_cache_list column to LIGHTRAG_DOC_CHUNKS table if it doesn't exist"""
438
+ try:
439
+ # Check if llm_cache_list column exists
440
+ check_column_sql = """
441
+ SELECT column_name
442
+ FROM information_schema.columns
443
+ WHERE table_name = 'lightrag_doc_chunks'
444
+ AND column_name = 'llm_cache_list'
445
+ """
446
+
447
+ column_info = await self.query(check_column_sql)
448
+ if not column_info:
449
+ logger.info("Adding llm_cache_list column to LIGHTRAG_DOC_CHUNKS table")
450
+ add_column_sql = """
451
+ ALTER TABLE LIGHTRAG_DOC_CHUNKS
452
+ ADD COLUMN llm_cache_list JSONB NULL DEFAULT '[]'::jsonb
453
+ """
454
+ await self.execute(add_column_sql)
455
+ logger.info(
456
+ "Successfully added llm_cache_list column to LIGHTRAG_DOC_CHUNKS table"
457
+ )
458
+ else:
459
+ logger.info(
460
+ "llm_cache_list column already exists in LIGHTRAG_DOC_CHUNKS table"
461
+ )
462
+ except Exception as e:
463
+ logger.warning(
464
+ f"Failed to add llm_cache_list column to LIGHTRAG_DOC_CHUNKS: {e}"
465
+ )
466
+
467
  async def check_tables(self):
468
  # First create all tables
469
  for k, v in TABLES.items():
 
515
  logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}")
516
  # Don't throw an exception, allow the initialization process to continue
517
 
518
+ # Migrate LLM cache table to add cache_type field if needed
519
+ try:
520
+ await self._migrate_llm_cache_add_cache_type()
521
+ except Exception as e:
522
+ logger.error(f"PostgreSQL, Failed to migrate LLM cache cache_type field: {e}")
523
+ # Don't throw an exception, allow the initialization process to continue
524
+
525
  # Finally, attempt to migrate old doc chunks data if needed
526
  try:
527
  await self._migrate_doc_chunks_to_vdb_chunks()
 
535
  except Exception as e:
536
  logger.error(f"PostgreSQL, LLM cache migration failed: {e}")
537
 
538
+ # Migrate doc status to add chunks_list field if needed
539
+ try:
540
+ await self._migrate_doc_status_add_chunks_list()
541
+ except Exception as e:
542
+ logger.error(
543
+ f"PostgreSQL, Failed to migrate doc status chunks_list field: {e}"
544
+ )
545
+
546
+ # Migrate text chunks to add llm_cache_list field if needed
547
+ try:
548
+ await self._migrate_text_chunks_add_llm_cache_list()
549
+ except Exception as e:
550
+ logger.error(
551
+ f"PostgreSQL, Failed to migrate text chunks llm_cache_list field: {e}"
552
+ )
553
+
554
  async def query(
555
  self,
556
  sql: str,
 
738
  if is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE):
739
  processed_results = {}
740
  for row in results:
 
 
 
 
741
  # Map field names and add cache_type for compatibility
742
  processed_row = {
743
  **row,
744
+ "return": row.get("return_value", ""),
745
+ "cache_type": row.get("original_prompt", "unknow"),
 
 
746
  "original_prompt": row.get("original_prompt", ""),
747
  "chunk_id": row.get("chunk_id"),
748
  "mode": row.get("mode", "default"),
 
750
  processed_results[row["id"]] = processed_row
751
  return processed_results
752
 
753
+ # For text_chunks namespace, parse llm_cache_list JSON string back to list
754
+ if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
755
+ processed_results = {}
756
+ for row in results:
757
+ llm_cache_list = row.get("llm_cache_list", [])
758
+ if isinstance(llm_cache_list, str):
759
+ try:
760
+ llm_cache_list = json.loads(llm_cache_list)
761
+ except json.JSONDecodeError:
762
+ llm_cache_list = []
763
+ row["llm_cache_list"] = llm_cache_list
764
+ processed_results[row["id"]] = row
765
+ return processed_results
766
+
767
  # For other namespaces, return as-is
768
  return {row["id"]: row for row in results}
769
  except Exception as e:
 
775
  sql = SQL_TEMPLATES["get_by_id_" + self.namespace]
776
  params = {"workspace": self.db.workspace, "id": id}
777
  response = await self.db.query(sql, params)
778
+
779
+ if response and is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
780
+ # Parse llm_cache_list JSON string back to list
781
+ llm_cache_list = response.get("llm_cache_list", [])
782
+ if isinstance(llm_cache_list, str):
783
+ try:
784
+ llm_cache_list = json.loads(llm_cache_list)
785
+ except json.JSONDecodeError:
786
+ llm_cache_list = []
787
+ response["llm_cache_list"] = llm_cache_list
788
+
789
+ # Special handling for LLM cache to ensure compatibility with _get_cached_extraction_results
790
+ if response and is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE):
791
+ # Map field names and add cache_type for compatibility
792
+ response = {
793
+ **response,
794
+ "return": response.get("return_value", ""),
795
+ "cache_type": response.get("cache_type"),
796
+ "original_prompt": response.get("original_prompt", ""),
797
+ "chunk_id": response.get("chunk_id"),
798
+ "mode": response.get("mode", "default"),
799
+ }
800
+
801
  return response if response else None
802
 
803
  # Query by id
 
807
  ids=",".join([f"'{id}'" for id in ids])
808
  )
809
  params = {"workspace": self.db.workspace}
810
+ results = await self.db.query(sql, params, multirows=True)
811
 
812
+ if results and is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
813
+ # Parse llm_cache_list JSON string back to list for each result
814
+ for result in results:
815
+ llm_cache_list = result.get("llm_cache_list", [])
816
+ if isinstance(llm_cache_list, str):
817
+ try:
818
+ llm_cache_list = json.loads(llm_cache_list)
819
+ except json.JSONDecodeError:
820
+ llm_cache_list = []
821
+ result["llm_cache_list"] = llm_cache_list
822
+
823
+ # Special handling for LLM cache to ensure compatibility with _get_cached_extraction_results
824
+ if results and is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE):
825
+ processed_results = []
826
+ for row in results:
827
+ # Map field names and add cache_type for compatibility
828
+ processed_row = {
829
+ **row,
830
+ "return": row.get("return_value", ""),
831
+ "cache_type": row.get("cache_type"),
832
+ "original_prompt": row.get("original_prompt", ""),
833
+ "chunk_id": row.get("chunk_id"),
834
+ "mode": row.get("mode", "default"),
835
+ }
836
+ processed_results.append(processed_row)
837
+ return processed_results
838
+
839
+ return results if results else []
840
 
841
  async def filter_keys(self, keys: set[str]) -> set[str]:
842
  """Filter out duplicated content"""
 
877
  "full_doc_id": v["full_doc_id"],
878
  "content": v["content"],
879
  "file_path": v["file_path"],
880
+ "llm_cache_list": json.dumps(v.get("llm_cache_list", [])),
881
  "create_time": current_time,
882
  "update_time": current_time,
883
  }
 
901
  "return_value": v["return"],
902
  "mode": v.get("mode", "default"), # Get mode from data
903
  "chunk_id": v.get("chunk_id"),
904
+ "cache_type": v.get("cache_type", "extract"), # Get cache_type from data
905
  }
906
 
907
  await self.db.execute(upsert_sql, _data)
 
1326
  if result is None or result == []:
1327
  return None
1328
  else:
1329
+ # Parse chunks_list JSON string back to list
1330
+ chunks_list = result[0].get("chunks_list", [])
1331
+ if isinstance(chunks_list, str):
1332
+ try:
1333
+ chunks_list = json.loads(chunks_list)
1334
+ except json.JSONDecodeError:
1335
+ chunks_list = []
1336
+
1337
  return dict(
1338
  content=result[0]["content"],
1339
  content_length=result[0]["content_length"],
 
1343
  created_at=result[0]["created_at"],
1344
  updated_at=result[0]["updated_at"],
1345
  file_path=result[0]["file_path"],
1346
+ chunks_list=chunks_list,
1347
  )
1348
 
1349
  async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
 
1358
 
1359
  if not results:
1360
  return []
1361
+
1362
+ processed_results = []
1363
+ for row in results:
1364
+ # Parse chunks_list JSON string back to list
1365
+ chunks_list = row.get("chunks_list", [])
1366
+ if isinstance(chunks_list, str):
1367
+ try:
1368
+ chunks_list = json.loads(chunks_list)
1369
+ except json.JSONDecodeError:
1370
+ chunks_list = []
1371
+
1372
+ processed_results.append(
1373
+ {
1374
+ "content": row["content"],
1375
+ "content_length": row["content_length"],
1376
+ "content_summary": row["content_summary"],
1377
+ "status": row["status"],
1378
+ "chunks_count": row["chunks_count"],
1379
+ "created_at": row["created_at"],
1380
+ "updated_at": row["updated_at"],
1381
+ "file_path": row["file_path"],
1382
+ "chunks_list": chunks_list,
1383
+ }
1384
+ )
1385
+
1386
+ return processed_results
1387
 
1388
  async def get_status_counts(self) -> dict[str, int]:
1389
  """Get counts of documents in each status"""
 
1404
  sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
1405
  params = {"workspace": self.db.workspace, "status": status.value}
1406
  result = await self.db.query(sql, params, True)
1407
+
1408
+ docs_by_status = {}
1409
+ for element in result:
1410
+ # Parse chunks_list JSON string back to list
1411
+ chunks_list = element.get("chunks_list", [])
1412
+ if isinstance(chunks_list, str):
1413
+ try:
1414
+ chunks_list = json.loads(chunks_list)
1415
+ except json.JSONDecodeError:
1416
+ chunks_list = []
1417
+
1418
+ docs_by_status[element["id"]] = DocProcessingStatus(
1419
  content=element["content"],
1420
  content_summary=element["content_summary"],
1421
  content_length=element["content_length"],
 
1424
  updated_at=element["updated_at"],
1425
  chunks_count=element["chunks_count"],
1426
  file_path=element["file_path"],
1427
+ chunks_list=chunks_list,
1428
  )
1429
+
 
1430
  return docs_by_status
1431
 
1432
  async def index_done_callback(self) -> None:
 
1490
  logger.warning(f"Unable to parse datetime string: {dt_str}")
1491
  return None
1492
 
1493
+ # Modified SQL to include created_at, updated_at, and chunks_list in both INSERT and UPDATE operations
1494
+ # All fields are updated from the input data in both INSERT and UPDATE cases
1495
+ sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path,chunks_list,created_at,updated_at)
1496
+ values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
1497
  on conflict(id,workspace) do update set
1498
  content = EXCLUDED.content,
1499
  content_summary = EXCLUDED.content_summary,
 
1501
  chunks_count = EXCLUDED.chunks_count,
1502
  status = EXCLUDED.status,
1503
  file_path = EXCLUDED.file_path,
1504
+ chunks_list = EXCLUDED.chunks_list,
1505
  created_at = EXCLUDED.created_at,
1506
  updated_at = EXCLUDED.updated_at"""
1507
  for k, v in data.items():
 
1509
  created_at = parse_datetime(v.get("created_at"))
1510
  updated_at = parse_datetime(v.get("updated_at"))
1511
 
1512
+ # chunks_count and chunks_list are optional
1513
  await self.db.execute(
1514
  sql,
1515
  {
 
1521
  "chunks_count": v["chunks_count"] if "chunks_count" in v else -1,
1522
  "status": v["status"],
1523
  "file_path": v["file_path"],
1524
+ "chunks_list": json.dumps(v.get("chunks_list", [])),
1525
  "created_at": created_at, # Use the converted datetime object
1526
  "updated_at": updated_at, # Use the converted datetime object
1527
  },
 
2840
  tokens INTEGER,
2841
  content TEXT,
2842
  file_path VARCHAR(256),
2843
+ llm_cache_list JSONB NULL DEFAULT '[]'::jsonb,
2844
  create_time TIMESTAMP(0) WITH TIME ZONE,
2845
  update_time TIMESTAMP(0) WITH TIME ZONE,
2846
  CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
 
2913
  chunks_count int4 NULL,
2914
  status varchar(64) NULL,
2915
  file_path TEXT NULL,
2916
+ chunks_list JSONB NULL DEFAULT '[]'::jsonb,
2917
  created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL,
2918
  updated_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL,
2919
  CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id)
 
2928
  FROM LIGHTRAG_DOC_FULL WHERE workspace=$1 AND id=$2
2929
  """,
2930
  "get_by_id_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
2931
+ chunk_order_index, full_doc_id, file_path,
2932
+ COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list
2933
  FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2
2934
  """,
2935
+ "get_by_id_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id, cache_type
2936
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id=$2
2937
  """,
2938
+ "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id
2939
  FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 AND id=$3
2940
  """,
2941
  "get_by_ids_full_docs": """SELECT id, COALESCE(content, '') as content
2942
  FROM LIGHTRAG_DOC_FULL WHERE workspace=$1 AND id IN ({ids})
2943
  """,
2944
  "get_by_ids_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
2945
+ chunk_order_index, full_doc_id, file_path,
2946
+ COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list
2947
  FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN ({ids})
2948
  """,
2949
+ "get_by_ids_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id, cache_type
2950
+ FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id IN ({ids})
2951
  """,
2952
  "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",
2953
  "upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, workspace)
 
2955
  ON CONFLICT (workspace,id) DO UPDATE
2956
  SET content = $2, update_time = CURRENT_TIMESTAMP
2957
  """,
2958
+ "upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode,chunk_id,cache_type)
2959
+ VALUES ($1, $2, $3, $4, $5, $6, $7)
2960
  ON CONFLICT (workspace,mode,id) DO UPDATE
2961
  SET original_prompt = EXCLUDED.original_prompt,
2962
  return_value=EXCLUDED.return_value,
2963
  mode=EXCLUDED.mode,
2964
  chunk_id=EXCLUDED.chunk_id,
2965
+ cache_type=EXCLUDED.cache_type,
2966
  update_time = CURRENT_TIMESTAMP
2967
  """,
2968
  "upsert_text_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
2969
+ chunk_order_index, full_doc_id, content, file_path, llm_cache_list,
2970
  create_time, update_time)
2971
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
2972
  ON CONFLICT (workspace,id) DO UPDATE
2973
  SET tokens=EXCLUDED.tokens,
2974
  chunk_order_index=EXCLUDED.chunk_order_index,
2975
  full_doc_id=EXCLUDED.full_doc_id,
2976
  content = EXCLUDED.content,
2977
  file_path=EXCLUDED.file_path,
2978
+ llm_cache_list=EXCLUDED.llm_cache_list,
2979
  update_time = EXCLUDED.update_time
2980
  """,
2981
  # SQL for VectorStorage
lightrag/kg/tidb_impl.py CHANGED
@@ -520,11 +520,6 @@ class TiDBVectorDBStorage(BaseVectorStorage):
520
  }
521
  await self.db.execute(SQL_TEMPLATES["upsert_relationship"], param)
522
 
523
- async def get_by_status(self, status: str) -> Union[list[dict[str, Any]], None]:
524
- SQL = SQL_TEMPLATES["get_by_status_" + self.namespace]
525
- params = {"workspace": self.db.workspace, "status": status}
526
- return await self.db.query(SQL, params, multirows=True)
527
-
528
  async def delete(self, ids: list[str]) -> None:
529
  """Delete vectors with specified IDs from the storage.
530
 
 
520
  }
521
  await self.db.execute(SQL_TEMPLATES["upsert_relationship"], param)
522
 
 
 
 
 
 
523
  async def delete(self, ids: list[str]) -> None:
524
  """Delete vectors with specified IDs from the storage.
525