Daniel.y commited on
Commit
183ee5a
·
unverified ·
2 Parent(s): 030c559 6cd2521

Merge pull request #1519 from danielaskdd/fix-json-postgres

Browse files
Files changed (2) hide show
  1. lightrag/kg/postgres_impl.py +68 -23
  2. lightrag/lightrag.py +10 -2
lightrag/kg/postgres_impl.py CHANGED
@@ -1178,7 +1178,7 @@ class PGGraphStorage(BaseGraphStorage):
1178
  with_age=True,
1179
  graph_name=self.graph_name,
1180
  )
1181
- logger.info(f"Successfully executed: {query}")
1182
  except Exception:
1183
  continue
1184
 
@@ -1373,6 +1373,15 @@ class PGGraphStorage(BaseGraphStorage):
1373
  node = record[0]
1374
  node_dict = node["n"]["properties"]
1375
 
 
 
 
 
 
 
 
 
 
1376
  return node_dict
1377
  return None
1378
 
@@ -1421,6 +1430,15 @@ class PGGraphStorage(BaseGraphStorage):
1421
  if record and record[0] and record[0]["edge_properties"]:
1422
  result = record[0]["edge_properties"]
1423
 
 
 
 
 
 
 
 
 
 
1424
  return result
1425
 
1426
  async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
@@ -1432,9 +1450,9 @@ class PGGraphStorage(BaseGraphStorage):
1432
 
1433
  query = """SELECT * FROM cypher('%s', $$
1434
  MATCH (n:base {entity_id: "%s"})
1435
- OPTIONAL MATCH (n)-[]-(connected)
1436
- RETURN n, connected
1437
- $$) AS (n agtype, connected agtype)""" % (
1438
  self.graph_name,
1439
  label,
1440
  )
@@ -1442,20 +1460,11 @@ class PGGraphStorage(BaseGraphStorage):
1442
  results = await self._query(query)
1443
  edges = []
1444
  for record in results:
1445
- source_node = record["n"] if record["n"] else None
1446
- connected_node = record["connected"] if record["connected"] else None
1447
 
1448
- if (
1449
- source_node
1450
- and connected_node
1451
- and "properties" in source_node
1452
- and "properties" in connected_node
1453
- ):
1454
- source_label = source_node["properties"].get("entity_id")
1455
- target_label = connected_node["properties"].get("entity_id")
1456
-
1457
- if source_label and target_label:
1458
- edges.append((source_label, target_label))
1459
 
1460
  return edges
1461
 
@@ -1638,6 +1647,18 @@ class PGGraphStorage(BaseGraphStorage):
1638
  for result in results:
1639
  if result["node_id"] and result["n"]:
1640
  node_dict = result["n"]["properties"]
 
 
 
 
 
 
 
 
 
 
 
 
1641
  # Remove the 'base' label if present in a 'labels' property
1642
  if "labels" in node_dict:
1643
  node_dict["labels"] = [
@@ -1789,15 +1810,39 @@ class PGGraphStorage(BaseGraphStorage):
1789
 
1790
  for result in forward_results:
1791
  if result["source"] and result["target"] and result["edge_properties"]:
1792
- edges_dict[(result["source"], result["target"])] = result[
1793
- "edge_properties"
1794
- ]
 
 
 
 
 
 
 
 
 
 
 
 
1795
 
1796
  for result in backward_results:
1797
  if result["source"] and result["target"] and result["edge_properties"]:
1798
- edges_dict[(result["source"], result["target"])] = result[
1799
- "edge_properties"
1800
- ]
 
 
 
 
 
 
 
 
 
 
 
 
1801
 
1802
  return edges_dict
1803
 
 
1178
  with_age=True,
1179
  graph_name=self.graph_name,
1180
  )
1181
+ # logger.info(f"Successfully executed: {query}")
1182
  except Exception:
1183
  continue
1184
 
 
1373
  node = record[0]
1374
  node_dict = node["n"]["properties"]
1375
 
1376
+ # Process string result, parse it to JSON dictionary
1377
+ if isinstance(node_dict, str):
1378
+ try:
1379
+ import json
1380
+
1381
+ node_dict = json.loads(node_dict)
1382
+ except json.JSONDecodeError:
1383
+ logger.warning(f"Failed to parse node string: {node_dict}")
1384
+
1385
  return node_dict
1386
  return None
1387
 
 
1430
  if record and record[0] and record[0]["edge_properties"]:
1431
  result = record[0]["edge_properties"]
1432
 
1433
+ # Process string result, parse it to JSON dictionary
1434
+ if isinstance(result, str):
1435
+ try:
1436
+ import json
1437
+
1438
+ result = json.loads(result)
1439
+ except json.JSONDecodeError:
1440
+ logger.warning(f"Failed to parse edge string: {result}")
1441
+
1442
  return result
1443
 
1444
  async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
 
1450
 
1451
  query = """SELECT * FROM cypher('%s', $$
1452
  MATCH (n:base {entity_id: "%s"})
1453
+ OPTIONAL MATCH (n)-[]-(connected:base)
1454
+ RETURN n.entity_id AS source_id, connected.entity_id AS connected_id
1455
+ $$) AS (source_id text, connected_id text)""" % (
1456
  self.graph_name,
1457
  label,
1458
  )
 
1460
  results = await self._query(query)
1461
  edges = []
1462
  for record in results:
1463
+ source_id = record["source_id"]
1464
+ connected_id = record["connected_id"]
1465
 
1466
+ if source_id and connected_id:
1467
+ edges.append((source_id, connected_id))
 
 
 
 
 
 
 
 
 
1468
 
1469
  return edges
1470
 
 
1647
  for result in results:
1648
  if result["node_id"] and result["n"]:
1649
  node_dict = result["n"]["properties"]
1650
+
1651
+ # Process string result, parse it to JSON dictionary
1652
+ if isinstance(node_dict, str):
1653
+ try:
1654
+ import json
1655
+
1656
+ node_dict = json.loads(node_dict)
1657
+ except json.JSONDecodeError:
1658
+ logger.warning(
1659
+ f"Failed to parse node string in batch: {node_dict}"
1660
+ )
1661
+
1662
  # Remove the 'base' label if present in a 'labels' property
1663
  if "labels" in node_dict:
1664
  node_dict["labels"] = [
 
1810
 
1811
  for result in forward_results:
1812
  if result["source"] and result["target"] and result["edge_properties"]:
1813
+ edge_props = result["edge_properties"]
1814
+
1815
+ # Process string result, parse it to JSON dictionary
1816
+ if isinstance(edge_props, str):
1817
+ try:
1818
+ import json
1819
+
1820
+ edge_props = json.loads(edge_props)
1821
+ except json.JSONDecodeError:
1822
+ logger.warning(
1823
+ f"Failed to parse edge properties string: {edge_props}"
1824
+ )
1825
+ continue
1826
+
1827
+ edges_dict[(result["source"], result["target"])] = edge_props
1828
 
1829
  for result in backward_results:
1830
  if result["source"] and result["target"] and result["edge_properties"]:
1831
+ edge_props = result["edge_properties"]
1832
+
1833
+ # Process string result, parse it to JSON dictionary
1834
+ if isinstance(edge_props, str):
1835
+ try:
1836
+ import json
1837
+
1838
+ edge_props = json.loads(edge_props)
1839
+ except json.JSONDecodeError:
1840
+ logger.warning(
1841
+ f"Failed to parse edge properties string: {edge_props}"
1842
+ )
1843
+ continue
1844
+
1845
+ edges_dict[(result["source"], result["target"])] = edge_props
1846
 
1847
  return edges_dict
1848
 
lightrag/lightrag.py CHANGED
@@ -994,10 +994,14 @@ class LightRAG:
994
 
995
  except Exception as e:
996
  # Log error and update pipeline status
997
- error_msg = f"Failed to extrat document {doc_id}: {traceback.format_exc()}"
 
998
  logger.error(error_msg)
999
  async with pipeline_status_lock:
1000
  pipeline_status["latest_message"] = error_msg
 
 
 
1001
  pipeline_status["history_messages"].append(error_msg)
1002
 
1003
  # Cancel other tasks as they are no longer meaningful
@@ -1080,10 +1084,14 @@ class LightRAG:
1080
 
1081
  except Exception as e:
1082
  # Log error and update pipeline status
1083
- error_msg = f"Merging stage failed in document {doc_id}: {traceback.format_exc()}"
 
1084
  logger.error(error_msg)
1085
  async with pipeline_status_lock:
1086
  pipeline_status["latest_message"] = error_msg
 
 
 
1087
  pipeline_status["history_messages"].append(error_msg)
1088
 
1089
  # Persistent llm cache
 
994
 
995
  except Exception as e:
996
  # Log error and update pipeline status
997
+ logger.error(traceback.format_exc())
998
+ error_msg = f"Failed to extrat document {current_file_number}/{total_files}: {file_path}"
999
  logger.error(error_msg)
1000
  async with pipeline_status_lock:
1001
  pipeline_status["latest_message"] = error_msg
1002
+ pipeline_status["history_messages"].append(
1003
+ traceback.format_exc()
1004
+ )
1005
  pipeline_status["history_messages"].append(error_msg)
1006
 
1007
  # Cancel other tasks as they are no longer meaningful
 
1084
 
1085
  except Exception as e:
1086
  # Log error and update pipeline status
1087
+ logger.error(traceback.format_exc())
1088
+ error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
1089
  logger.error(error_msg)
1090
  async with pipeline_status_lock:
1091
  pipeline_status["latest_message"] = error_msg
1092
+ pipeline_status["history_messages"].append(
1093
+ traceback.format_exc()
1094
+ )
1095
  pipeline_status["history_messages"].append(error_msg)
1096
 
1097
  # Persistent llm cache