yangdx commited on
Commit
3f54db9
·
2 Parent(s): c27f760 4eec805

Merge branch 'main' into fix-neo4j-duplicate-nodes

Browse files
README.md CHANGED
@@ -176,6 +176,8 @@ class QueryParam:
176
  """Maximum number of tokens allocated for relationship descriptions in global retrieval."""
177
  max_token_for_local_context: int = 4000
178
  """Maximum number of tokens allocated for entity descriptions in local retrieval."""
 
 
179
  ...
180
  ```
181
 
 
176
  """Maximum number of tokens allocated for relationship descriptions in global retrieval."""
177
  max_token_for_local_context: int = 4000
178
  """Maximum number of tokens allocated for entity descriptions in local retrieval."""
179
+ ids: list[str] | None = None # ONLY SUPPORTED FOR PG VECTOR DBs
180
+ """List of ids to filter the RAG."""
181
  ...
182
  ```
183
 
lightrag/__init__.py CHANGED
@@ -1,5 +1,5 @@
1
  from .lightrag import LightRAG as LightRAG, QueryParam as QueryParam
2
 
3
- __version__ = "1.2.4"
4
  __author__ = "Zirui Guo"
5
  __url__ = "https://github.com/HKUDS/LightRAG"
 
1
  from .lightrag import LightRAG as LightRAG, QueryParam as QueryParam
2
 
3
+ __version__ = "1.2.5"
4
  __author__ = "Zirui Guo"
5
  __url__ = "https://github.com/HKUDS/LightRAG"
lightrag/base.py CHANGED
@@ -81,6 +81,9 @@ class QueryParam:
81
  history_turns: int = 3
82
  """Number of complete conversation turns (user-assistant pairs) to consider in the response context."""
83
 
 
 
 
84
 
85
  @dataclass
86
  class StorageNameSpace(ABC):
@@ -107,7 +110,9 @@ class BaseVectorStorage(StorageNameSpace, ABC):
107
  meta_fields: set[str] = field(default_factory=set)
108
 
109
  @abstractmethod
110
- async def query(self, query: str, top_k: int) -> list[dict[str, Any]]:
 
 
111
  """Query the vector storage and retrieve top_k results."""
112
 
113
  @abstractmethod
 
81
  history_turns: int = 3
82
  """Number of complete conversation turns (user-assistant pairs) to consider in the response context."""
83
 
84
+ ids: list[str] | None = None
85
+ """List of ids to filter the results."""
86
+
87
 
88
  @dataclass
89
  class StorageNameSpace(ABC):
 
110
  meta_fields: set[str] = field(default_factory=set)
111
 
112
  @abstractmethod
113
+ async def query(
114
+ self, query: str, top_k: int, ids: list[str] | None = None
115
+ ) -> list[dict[str, Any]]:
116
  """Query the vector storage and retrieve top_k results."""
117
 
118
  @abstractmethod
lightrag/kg/postgres_impl.py CHANGED
@@ -438,6 +438,8 @@ class PGVectorStorage(BaseVectorStorage):
438
  "entity_name": item["entity_name"],
439
  "content": item["content"],
440
  "content_vector": json.dumps(item["__vector__"].tolist()),
 
 
441
  }
442
  return upsert_sql, data
443
 
@@ -450,6 +452,8 @@ class PGVectorStorage(BaseVectorStorage):
450
  "target_id": item["tgt_id"],
451
  "content": item["content"],
452
  "content_vector": json.dumps(item["__vector__"].tolist()),
 
 
453
  }
454
  return upsert_sql, data
455
 
@@ -492,13 +496,20 @@ class PGVectorStorage(BaseVectorStorage):
492
  await self.db.execute(upsert_sql, data)
493
 
494
  #################### query method ###############
495
- async def query(self, query: str, top_k: int) -> list[dict[str, Any]]:
 
 
496
  embeddings = await self.embedding_func([query])
497
  embedding = embeddings[0]
498
  embedding_string = ",".join(map(str, embedding))
499
 
 
 
 
 
 
500
  sql = SQL_TEMPLATES[self.base_namespace].format(
501
- embedding_string=embedding_string
502
  )
503
  params = {
504
  "workspace": self.db.workspace,
@@ -1491,6 +1502,7 @@ TABLES = {
1491
  content_vector VECTOR,
1492
  create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1493
  update_time TIMESTAMP,
 
1494
  CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id)
1495
  )"""
1496
  },
@@ -1504,6 +1516,7 @@ TABLES = {
1504
  content_vector VECTOR,
1505
  create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1506
  update_time TIMESTAMP,
 
1507
  CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id)
1508
  )"""
1509
  },
@@ -1586,8 +1599,9 @@ SQL_TEMPLATES = {
1586
  content_vector=EXCLUDED.content_vector,
1587
  update_time = CURRENT_TIMESTAMP
1588
  """,
1589
- "upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content, content_vector)
1590
- VALUES ($1, $2, $3, $4, $5)
 
1591
  ON CONFLICT (workspace,id) DO UPDATE
1592
  SET entity_name=EXCLUDED.entity_name,
1593
  content=EXCLUDED.content,
@@ -1595,8 +1609,8 @@ SQL_TEMPLATES = {
1595
  update_time=CURRENT_TIMESTAMP
1596
  """,
1597
  "upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id,
1598
- target_id, content, content_vector)
1599
- VALUES ($1, $2, $3, $4, $5, $6)
1600
  ON CONFLICT (workspace,id) DO UPDATE
1601
  SET source_id=EXCLUDED.source_id,
1602
  target_id=EXCLUDED.target_id,
@@ -1604,21 +1618,21 @@ SQL_TEMPLATES = {
1604
  content_vector=EXCLUDED.content_vector, update_time = CURRENT_TIMESTAMP
1605
  """,
1606
  # SQL for VectorStorage
1607
- "entities": """SELECT entity_name FROM
1608
- (SELECT id, entity_name, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
1609
- FROM LIGHTRAG_VDB_ENTITY where workspace=$1)
1610
- WHERE distance>$2 ORDER BY distance DESC LIMIT $3
1611
- """,
1612
- "relationships": """SELECT source_id as src_id, target_id as tgt_id FROM
1613
- (SELECT id, source_id,target_id, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
1614
- FROM LIGHTRAG_VDB_RELATION where workspace=$1)
1615
- WHERE distance>$2 ORDER BY distance DESC LIMIT $3
1616
- """,
1617
- "chunks": """SELECT id FROM
1618
- (SELECT id, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
1619
- FROM LIGHTRAG_DOC_CHUNKS where workspace=$1)
1620
- WHERE distance>$2 ORDER BY distance DESC LIMIT $3
1621
- """,
1622
  # DROP tables
1623
  "drop_all": """
1624
  DROP TABLE IF EXISTS LIGHTRAG_DOC_FULL CASCADE;
@@ -1642,4 +1656,55 @@ SQL_TEMPLATES = {
1642
  "drop_vdb_relation": """
1643
  DROP TABLE IF EXISTS LIGHTRAG_VDB_RELATION CASCADE;
1644
  """,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1645
  }
 
438
  "entity_name": item["entity_name"],
439
  "content": item["content"],
440
  "content_vector": json.dumps(item["__vector__"].tolist()),
441
+ "chunk_id": item["source_id"],
442
+ # TODO: add document_id
443
  }
444
  return upsert_sql, data
445
 
 
452
  "target_id": item["tgt_id"],
453
  "content": item["content"],
454
  "content_vector": json.dumps(item["__vector__"].tolist()),
455
+ "chunk_id": item["source_id"],
456
+ # TODO: add document_id
457
  }
458
  return upsert_sql, data
459
 
 
496
  await self.db.execute(upsert_sql, data)
497
 
498
  #################### query method ###############
499
+ async def query(
500
+ self, query: str, top_k: int, ids: list[str] | None = None
501
+ ) -> list[dict[str, Any]]:
502
  embeddings = await self.embedding_func([query])
503
  embedding = embeddings[0]
504
  embedding_string = ",".join(map(str, embedding))
505
 
506
+ if ids:
507
+ formatted_ids = ",".join(f"'{id}'" for id in ids)
508
+ else:
509
+ formatted_ids = "NULL"
510
+
511
  sql = SQL_TEMPLATES[self.base_namespace].format(
512
+ embedding_string=embedding_string, doc_ids=formatted_ids
513
  )
514
  params = {
515
  "workspace": self.db.workspace,
 
1502
  content_vector VECTOR,
1503
  create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1504
  update_time TIMESTAMP,
1505
+ chunk_id VARCHAR(255) NULL,
1506
  CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id)
1507
  )"""
1508
  },
 
1516
  content_vector VECTOR,
1517
  create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1518
  update_time TIMESTAMP,
1519
+ chunk_id VARCHAR(255) NULL,
1520
  CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id)
1521
  )"""
1522
  },
 
1599
  content_vector=EXCLUDED.content_vector,
1600
  update_time = CURRENT_TIMESTAMP
1601
  """,
1602
+ "upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content,
1603
+ content_vector, chunk_id)
1604
+ VALUES ($1, $2, $3, $4, $5, $6)
1605
  ON CONFLICT (workspace,id) DO UPDATE
1606
  SET entity_name=EXCLUDED.entity_name,
1607
  content=EXCLUDED.content,
 
1609
  update_time=CURRENT_TIMESTAMP
1610
  """,
1611
  "upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id,
1612
+ target_id, content, content_vector, chunk_id)
1613
+ VALUES ($1, $2, $3, $4, $5, $6, $7)
1614
  ON CONFLICT (workspace,id) DO UPDATE
1615
  SET source_id=EXCLUDED.source_id,
1616
  target_id=EXCLUDED.target_id,
 
1618
  content_vector=EXCLUDED.content_vector, update_time = CURRENT_TIMESTAMP
1619
  """,
1620
  # SQL for VectorStorage
1621
+ # "entities": """SELECT entity_name FROM
1622
+ # (SELECT id, entity_name, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
1623
+ # FROM LIGHTRAG_VDB_ENTITY where workspace=$1)
1624
+ # WHERE distance>$2 ORDER BY distance DESC LIMIT $3
1625
+ # """,
1626
+ # "relationships": """SELECT source_id as src_id, target_id as tgt_id FROM
1627
+ # (SELECT id, source_id,target_id, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
1628
+ # FROM LIGHTRAG_VDB_RELATION where workspace=$1)
1629
+ # WHERE distance>$2 ORDER BY distance DESC LIMIT $3
1630
+ # """,
1631
+ # "chunks": """SELECT id FROM
1632
+ # (SELECT id, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
1633
+ # FROM LIGHTRAG_DOC_CHUNKS where workspace=$1)
1634
+ # WHERE distance>$2 ORDER BY distance DESC LIMIT $3
1635
+ # """,
1636
  # DROP tables
1637
  "drop_all": """
1638
  DROP TABLE IF EXISTS LIGHTRAG_DOC_FULL CASCADE;
 
1656
  "drop_vdb_relation": """
1657
  DROP TABLE IF EXISTS LIGHTRAG_VDB_RELATION CASCADE;
1658
  """,
1659
+ "relationships": """
1660
+ WITH relevant_chunks AS (
1661
+ SELECT id as chunk_id
1662
+ FROM LIGHTRAG_DOC_CHUNKS
1663
+ WHERE {doc_ids} IS NULL OR full_doc_id = ANY(ARRAY[{doc_ids}])
1664
+ )
1665
+ SELECT source_id as src_id, target_id as tgt_id
1666
+ FROM (
1667
+ SELECT r.id, r.source_id, r.target_id, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance
1668
+ FROM LIGHTRAG_VDB_RELATION r
1669
+ WHERE r.workspace=$1
1670
+ AND r.chunk_id IN (SELECT chunk_id FROM relevant_chunks)
1671
+ ) filtered
1672
+ WHERE distance>$2
1673
+ ORDER BY distance DESC
1674
+ LIMIT $3
1675
+ """,
1676
+ "entities": """
1677
+ WITH relevant_chunks AS (
1678
+ SELECT id as chunk_id
1679
+ FROM LIGHTRAG_DOC_CHUNKS
1680
+ WHERE {doc_ids} IS NULL OR full_doc_id = ANY(ARRAY[{doc_ids}])
1681
+ )
1682
+ SELECT entity_name FROM
1683
+ (
1684
+ SELECT id, entity_name, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
1685
+ FROM LIGHTRAG_VDB_ENTITY
1686
+ where workspace=$1
1687
+ AND chunk_id IN (SELECT chunk_id FROM relevant_chunks)
1688
+ )
1689
+ WHERE distance>$2
1690
+ ORDER BY distance DESC
1691
+ LIMIT $3
1692
+ """,
1693
+ "chunks": """
1694
+ WITH relevant_chunks AS (
1695
+ SELECT id as chunk_id
1696
+ FROM LIGHTRAG_DOC_CHUNKS
1697
+ WHERE {doc_ids} IS NULL OR full_doc_id = ANY(ARRAY[{doc_ids}])
1698
+ )
1699
+ SELECT id FROM
1700
+ (
1701
+ SELECT id, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
1702
+ FROM LIGHTRAG_DOC_CHUNKS
1703
+ where workspace=$1
1704
+ AND id IN (SELECT chunk_id FROM relevant_chunks)
1705
+ )
1706
+ WHERE distance>$2
1707
+ ORDER BY distance DESC
1708
+ LIMIT $3
1709
+ """,
1710
  }
lightrag/lightrag.py CHANGED
@@ -30,11 +30,10 @@ from .namespace import NameSpace, make_namespace
30
  from .operate import (
31
  chunking_by_token_size,
32
  extract_entities,
33
- extract_keywords_only,
34
  kg_query,
35
- kg_query_with_keywords,
36
  mix_kg_vector_query,
37
  naive_query,
 
38
  )
39
  from .prompt import GRAPH_FIELD_SEP, PROMPTS
40
  from .utils import (
@@ -45,6 +44,9 @@ from .utils import (
45
  encode_string_by_tiktoken,
46
  lazy_external_import,
47
  limit_async_func_call,
 
 
 
48
  logger,
49
  )
50
  from .types import KnowledgeGraph
@@ -309,7 +311,7 @@ class LightRAG:
309
  # Verify storage implementation compatibility
310
  verify_storage_implementation(storage_type, storage_name)
311
  # Check environment variables
312
- # self.check_storage_env_vars(storage_name)
313
 
314
  # Ensure vector_db_storage_cls_kwargs has required fields
315
  self.vector_db_storage_cls_kwargs = {
@@ -536,11 +538,6 @@ class LightRAG:
536
  storage_class = lazy_external_import(import_path, storage_name)
537
  return storage_class
538
 
539
- @staticmethod
540
- def clean_text(text: str) -> str:
541
- """Clean text by removing null bytes (0x00) and whitespace"""
542
- return text.strip().replace("\x00", "")
543
-
544
  def insert(
545
  self,
546
  input: str | list[str],
@@ -602,8 +599,8 @@ class LightRAG:
602
  update_storage = False
603
  try:
604
  # Clean input texts
605
- full_text = self.clean_text(full_text)
606
- text_chunks = [self.clean_text(chunk) for chunk in text_chunks]
607
 
608
  # Process cleaned texts
609
  if doc_id is None:
@@ -682,7 +679,7 @@ class LightRAG:
682
  contents = {id_: doc for id_, doc in zip(ids, input)}
683
  else:
684
  # Clean input text and remove duplicates
685
- input = list(set(self.clean_text(doc) for doc in input))
686
  # Generate contents dict of MD5 hash IDs and documents
687
  contents = {compute_mdhash_id(doc, prefix="doc-"): doc for doc in input}
688
 
@@ -698,7 +695,7 @@ class LightRAG:
698
  new_docs: dict[str, Any] = {
699
  id_: {
700
  "content": content,
701
- "content_summary": self._get_content_summary(content),
702
  "content_length": len(content),
703
  "status": DocStatus.PENDING,
704
  "created_at": datetime.now().isoformat(),
@@ -1063,7 +1060,7 @@ class LightRAG:
1063
  all_chunks_data: dict[str, dict[str, str]] = {}
1064
  chunk_to_source_map: dict[str, str] = {}
1065
  for chunk_data in custom_kg.get("chunks", []):
1066
- chunk_content = self.clean_text(chunk_data["content"])
1067
  source_id = chunk_data["source_id"]
1068
  tokens = len(
1069
  encode_string_by_tiktoken(
@@ -1296,8 +1293,17 @@ class LightRAG:
1296
  self, query: str, prompt: str, param: QueryParam = QueryParam()
1297
  ):
1298
  """
1299
- 1. Extract keywords from the 'query' using new function in operate.py.
1300
- 2. Then run the standard aquery() flow with the final prompt (formatted_question).
 
 
 
 
 
 
 
 
 
1301
  """
1302
  loop = always_get_an_event_loop()
1303
  return loop.run_until_complete(
@@ -1308,66 +1314,29 @@ class LightRAG:
1308
  self, query: str, prompt: str, param: QueryParam = QueryParam()
1309
  ) -> str | AsyncIterator[str]:
1310
  """
1311
- 1. Calls extract_keywords_only to get HL/LL keywords from 'query'.
1312
- 2. Then calls kg_query(...) or naive_query(...), etc. as the main query, while also injecting the newly extracted keywords if needed.
 
 
 
 
 
 
 
1313
  """
1314
- # ---------------------
1315
- # STEP 1: Keyword Extraction
1316
- # ---------------------
1317
- hl_keywords, ll_keywords = await extract_keywords_only(
1318
- text=query,
1319
  param=param,
 
 
 
 
 
1320
  global_config=asdict(self),
1321
- hashing_kv=self.llm_response_cache, # Directly use llm_response_cache
1322
  )
1323
 
1324
- param.hl_keywords = hl_keywords
1325
- param.ll_keywords = ll_keywords
1326
-
1327
- # ---------------------
1328
- # STEP 2: Final Query Logic
1329
- # ---------------------
1330
-
1331
- # Create a new string with the prompt and the keywords
1332
- ll_keywords_str = ", ".join(ll_keywords)
1333
- hl_keywords_str = ", ".join(hl_keywords)
1334
- formatted_question = f"{prompt}\n\n### Keywords:\nHigh-level: {hl_keywords_str}\nLow-level: {ll_keywords_str}\n\n### Query:\n{query}"
1335
-
1336
- if param.mode in ["local", "global", "hybrid"]:
1337
- response = await kg_query_with_keywords(
1338
- formatted_question,
1339
- self.chunk_entity_relation_graph,
1340
- self.entities_vdb,
1341
- self.relationships_vdb,
1342
- self.text_chunks,
1343
- param,
1344
- asdict(self),
1345
- hashing_kv=self.llm_response_cache, # Directly use llm_response_cache
1346
- )
1347
- elif param.mode == "naive":
1348
- response = await naive_query(
1349
- formatted_question,
1350
- self.chunks_vdb,
1351
- self.text_chunks,
1352
- param,
1353
- asdict(self),
1354
- hashing_kv=self.llm_response_cache, # Directly use llm_response_cache
1355
- )
1356
- elif param.mode == "mix":
1357
- response = await mix_kg_vector_query(
1358
- formatted_question,
1359
- self.chunk_entity_relation_graph,
1360
- self.entities_vdb,
1361
- self.relationships_vdb,
1362
- self.chunks_vdb,
1363
- self.text_chunks,
1364
- param,
1365
- asdict(self),
1366
- hashing_kv=self.llm_response_cache, # Directly use llm_response_cache
1367
- )
1368
- else:
1369
- raise ValueError(f"Unknown mode {param.mode}")
1370
-
1371
  await self._query_done()
1372
  return response
1373
 
@@ -1465,21 +1434,6 @@ class LightRAG:
1465
  ]
1466
  )
1467
 
1468
- def _get_content_summary(self, content: str, max_length: int = 100) -> str:
1469
- """Get summary of document content
1470
-
1471
- Args:
1472
- content: Original document content
1473
- max_length: Maximum length of summary
1474
-
1475
- Returns:
1476
- Truncated content with ellipsis if needed
1477
- """
1478
- content = content.strip()
1479
- if len(content) <= max_length:
1480
- return content
1481
- return content[:max_length] + "..."
1482
-
1483
  async def get_processing_status(self) -> dict[str, int]:
1484
  """Get current document processing status counts
1485
 
@@ -2622,6 +2576,12 @@ class LightRAG:
2622
 
2623
  # 9. Delete source entities
2624
  for entity_name in source_entities:
 
 
 
 
 
 
2625
  # Delete entity node from knowledge graph
2626
  await self.chunk_entity_relation_graph.delete_node(entity_name)
2627
 
 
30
  from .operate import (
31
  chunking_by_token_size,
32
  extract_entities,
 
33
  kg_query,
 
34
  mix_kg_vector_query,
35
  naive_query,
36
+ query_with_keywords,
37
  )
38
  from .prompt import GRAPH_FIELD_SEP, PROMPTS
39
  from .utils import (
 
44
  encode_string_by_tiktoken,
45
  lazy_external_import,
46
  limit_async_func_call,
47
+ get_content_summary,
48
+ clean_text,
49
+ check_storage_env_vars,
50
  logger,
51
  )
52
  from .types import KnowledgeGraph
 
311
  # Verify storage implementation compatibility
312
  verify_storage_implementation(storage_type, storage_name)
313
  # Check environment variables
314
+ check_storage_env_vars(storage_name)
315
 
316
  # Ensure vector_db_storage_cls_kwargs has required fields
317
  self.vector_db_storage_cls_kwargs = {
 
538
  storage_class = lazy_external_import(import_path, storage_name)
539
  return storage_class
540
 
 
 
 
 
 
541
  def insert(
542
  self,
543
  input: str | list[str],
 
599
  update_storage = False
600
  try:
601
  # Clean input texts
602
+ full_text = clean_text(full_text)
603
+ text_chunks = [clean_text(chunk) for chunk in text_chunks]
604
 
605
  # Process cleaned texts
606
  if doc_id is None:
 
679
  contents = {id_: doc for id_, doc in zip(ids, input)}
680
  else:
681
  # Clean input text and remove duplicates
682
+ input = list(set(clean_text(doc) for doc in input))
683
  # Generate contents dict of MD5 hash IDs and documents
684
  contents = {compute_mdhash_id(doc, prefix="doc-"): doc for doc in input}
685
 
 
695
  new_docs: dict[str, Any] = {
696
  id_: {
697
  "content": content,
698
+ "content_summary": get_content_summary(content),
699
  "content_length": len(content),
700
  "status": DocStatus.PENDING,
701
  "created_at": datetime.now().isoformat(),
 
1060
  all_chunks_data: dict[str, dict[str, str]] = {}
1061
  chunk_to_source_map: dict[str, str] = {}
1062
  for chunk_data in custom_kg.get("chunks", []):
1063
+ chunk_content = clean_text(chunk_data["content"])
1064
  source_id = chunk_data["source_id"]
1065
  tokens = len(
1066
  encode_string_by_tiktoken(
 
1293
  self, query: str, prompt: str, param: QueryParam = QueryParam()
1294
  ):
1295
  """
1296
+ Query with separate keyword extraction step.
1297
+
1298
+ This method extracts keywords from the query first, then uses them for the query.
1299
+
1300
+ Args:
1301
+ query: User query
1302
+ prompt: Additional prompt for the query
1303
+ param: Query parameters
1304
+
1305
+ Returns:
1306
+ Query response
1307
  """
1308
  loop = always_get_an_event_loop()
1309
  return loop.run_until_complete(
 
1314
  self, query: str, prompt: str, param: QueryParam = QueryParam()
1315
  ) -> str | AsyncIterator[str]:
1316
  """
1317
+ Async version of query_with_separate_keyword_extraction.
1318
+
1319
+ Args:
1320
+ query: User query
1321
+ prompt: Additional prompt for the query
1322
+ param: Query parameters
1323
+
1324
+ Returns:
1325
+ Query response or async iterator
1326
  """
1327
+ response = await query_with_keywords(
1328
+ query=query,
1329
+ prompt=prompt,
 
 
1330
  param=param,
1331
+ knowledge_graph_inst=self.chunk_entity_relation_graph,
1332
+ entities_vdb=self.entities_vdb,
1333
+ relationships_vdb=self.relationships_vdb,
1334
+ chunks_vdb=self.chunks_vdb,
1335
+ text_chunks_db=self.text_chunks,
1336
  global_config=asdict(self),
1337
+ hashing_kv=self.llm_response_cache,
1338
  )
1339
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1340
  await self._query_done()
1341
  return response
1342
 
 
1434
  ]
1435
  )
1436
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1437
  async def get_processing_status(self) -> dict[str, int]:
1438
  """Get current document processing status counts
1439
 
 
2576
 
2577
  # 9. Delete source entities
2578
  for entity_name in source_entities:
2579
+ if entity_name == target_entity:
2580
+ logger.info(
2581
+ f"Skipping deletion of '{entity_name}' as it's also the target entity"
2582
+ )
2583
+ continue
2584
+
2585
  # Delete entity node from knowledge graph
2586
  await self.chunk_entity_relation_graph.delete_node(entity_name)
2587
 
lightrag/llm/azure_openai.py CHANGED
@@ -55,6 +55,7 @@ async def azure_openai_complete_if_cache(
55
 
56
  openai_async_client = AsyncAzureOpenAI(
57
  azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
 
58
  api_key=os.getenv("AZURE_OPENAI_API_KEY"),
59
  api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
60
  )
@@ -136,6 +137,7 @@ async def azure_openai_embed(
136
 
137
  openai_async_client = AsyncAzureOpenAI(
138
  azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
 
139
  api_key=os.getenv("AZURE_OPENAI_API_KEY"),
140
  api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
141
  )
 
55
 
56
  openai_async_client = AsyncAzureOpenAI(
57
  azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
58
+ azure_deployment=model,
59
  api_key=os.getenv("AZURE_OPENAI_API_KEY"),
60
  api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
61
  )
 
137
 
138
  openai_async_client = AsyncAzureOpenAI(
139
  azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
140
+ azure_deployment=model,
141
  api_key=os.getenv("AZURE_OPENAI_API_KEY"),
142
  api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
143
  )
lightrag/operate.py CHANGED
@@ -141,18 +141,36 @@ async def _handle_single_entity_extraction(
141
  ):
142
  if len(record_attributes) < 4 or record_attributes[0] != '"entity"':
143
  return None
144
- # add this record as a node in the G
 
145
  entity_name = clean_str(record_attributes[1]).strip('"')
146
  if not entity_name.strip():
 
 
 
147
  return None
 
 
148
  entity_type = clean_str(record_attributes[2]).strip('"')
 
 
 
 
 
 
 
149
  entity_description = clean_str(record_attributes[3]).strip('"')
150
- entity_source_id = chunk_key
 
 
 
 
 
151
  return dict(
152
  entity_name=entity_name,
153
  entity_type=entity_type,
154
  description=entity_description,
155
- source_id=entity_source_id,
156
  metadata={"created_at": time.time()},
157
  )
158
 
@@ -438,30 +456,91 @@ async def extract_entities(
438
  else:
439
  return await use_llm_func(input_text)
440
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
441
  async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
442
- """ "Prpocess a single chunk
443
  Args:
444
  chunk_key_dp (tuple[str, TextChunkSchema]):
445
- ("chunck-xxxxxx", {"tokens": int, "content": str, "full_doc_id": str, "chunk_order_index": int})
446
  """
447
  nonlocal processed_chunks
448
  chunk_key = chunk_key_dp[0]
449
  chunk_dp = chunk_key_dp[1]
450
  content = chunk_dp["content"]
451
- # hint_prompt = entity_extract_prompt.format(**context_base, input_text=content)
 
452
  hint_prompt = entity_extract_prompt.format(
453
  **context_base, input_text="{input_text}"
454
  ).format(**context_base, input_text=content)
455
 
456
  final_result = await _user_llm_func_with_cache(hint_prompt)
457
  history = pack_user_ass_to_openai_messages(hint_prompt, final_result)
 
 
 
 
 
 
 
458
  for now_glean_index in range(entity_extract_max_gleaning):
459
  glean_result = await _user_llm_func_with_cache(
460
  continue_prompt, history_messages=history
461
  )
462
 
463
  history += pack_user_ass_to_openai_messages(continue_prompt, glean_result)
464
- final_result += glean_result
 
 
 
 
 
 
 
 
 
 
 
465
  if now_glean_index == entity_extract_max_gleaning - 1:
466
  break
467
 
@@ -472,35 +551,6 @@ async def extract_entities(
472
  if if_loop_result != "yes":
473
  break
474
 
475
- records = split_string_by_multi_markers(
476
- final_result,
477
- [context_base["record_delimiter"], context_base["completion_delimiter"]],
478
- )
479
-
480
- maybe_nodes = defaultdict(list)
481
- maybe_edges = defaultdict(list)
482
- for record in records:
483
- record = re.search(r"\((.*)\)", record)
484
- if record is None:
485
- continue
486
- record = record.group(1)
487
- record_attributes = split_string_by_multi_markers(
488
- record, [context_base["tuple_delimiter"]]
489
- )
490
- if_entities = await _handle_single_entity_extraction(
491
- record_attributes, chunk_key
492
- )
493
- if if_entities is not None:
494
- maybe_nodes[if_entities["entity_name"]].append(if_entities)
495
- continue
496
-
497
- if_relation = await _handle_single_relationship_extraction(
498
- record_attributes, chunk_key
499
- )
500
- if if_relation is not None:
501
- maybe_edges[(if_relation["src_id"], if_relation["tgt_id"])].append(
502
- if_relation
503
- )
504
  processed_chunks += 1
505
  entities_count = len(maybe_nodes)
506
  relations_count = len(maybe_edges)
@@ -912,7 +962,10 @@ async def mix_kg_vector_query(
912
  try:
913
  # Reduce top_k for vector search in hybrid mode since we have structured information from KG
914
  mix_topk = min(10, query_param.top_k)
915
- results = await chunks_vdb.query(augmented_query, top_k=mix_topk)
 
 
 
916
  if not results:
917
  return None
918
 
@@ -1121,7 +1174,11 @@ async def _get_node_data(
1121
  logger.info(
1122
  f"Query nodes: {query}, top_k: {query_param.top_k}, cosine: {entities_vdb.cosine_better_than_threshold}"
1123
  )
1124
- results = await entities_vdb.query(query, top_k=query_param.top_k)
 
 
 
 
1125
  if not len(results):
1126
  return "", "", ""
1127
  # get entity information
@@ -1374,7 +1431,10 @@ async def _get_edge_data(
1374
  logger.info(
1375
  f"Query edges: {keywords}, top_k: {query_param.top_k}, cosine: {relationships_vdb.cosine_better_than_threshold}"
1376
  )
1377
- results = await relationships_vdb.query(keywords, top_k=query_param.top_k)
 
 
 
1378
 
1379
  if not len(results):
1380
  return "", "", ""
@@ -1623,7 +1683,9 @@ async def naive_query(
1623
  if cached_response is not None:
1624
  return cached_response
1625
 
1626
- results = await chunks_vdb.query(query, top_k=query_param.top_k)
 
 
1627
  if not len(results):
1628
  return PROMPTS["fail_response"]
1629
 
@@ -1854,3 +1916,90 @@ async def kg_query_with_keywords(
1854
  )
1855
 
1856
  return response
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
141
  ):
142
  if len(record_attributes) < 4 or record_attributes[0] != '"entity"':
143
  return None
144
+
145
+ # Clean and validate entity name
146
  entity_name = clean_str(record_attributes[1]).strip('"')
147
  if not entity_name.strip():
148
+ logger.warning(
149
+ f"Entity extraction error: empty entity name in: {record_attributes}"
150
+ )
151
  return None
152
+
153
+ # Clean and validate entity type
154
  entity_type = clean_str(record_attributes[2]).strip('"')
155
+ if not entity_type.strip() or entity_type.startswith('("'):
156
+ logger.warning(
157
+ f"Entity extraction error: invalid entity type in: {record_attributes}"
158
+ )
159
+ return None
160
+
161
+ # Clean and validate description
162
  entity_description = clean_str(record_attributes[3]).strip('"')
163
+ if not entity_description.strip():
164
+ logger.warning(
165
+ f"Entity extraction error: empty description for entity '{entity_name}' of type '{entity_type}'"
166
+ )
167
+ return None
168
+
169
  return dict(
170
  entity_name=entity_name,
171
  entity_type=entity_type,
172
  description=entity_description,
173
+ source_id=chunk_key,
174
  metadata={"created_at": time.time()},
175
  )
176
 
 
456
  else:
457
  return await use_llm_func(input_text)
458
 
459
+ async def _process_extraction_result(result: str, chunk_key: str):
460
+ """Process a single extraction result (either initial or gleaning)
461
+ Args:
462
+ result (str): The extraction result to process
463
+ chunk_key (str): The chunk key for source tracking
464
+ Returns:
465
+ tuple: (nodes_dict, edges_dict) containing the extracted entities and relationships
466
+ """
467
+ maybe_nodes = defaultdict(list)
468
+ maybe_edges = defaultdict(list)
469
+
470
+ records = split_string_by_multi_markers(
471
+ result,
472
+ [context_base["record_delimiter"], context_base["completion_delimiter"]],
473
+ )
474
+
475
+ for record in records:
476
+ record = re.search(r"\((.*)\)", record)
477
+ if record is None:
478
+ continue
479
+ record = record.group(1)
480
+ record_attributes = split_string_by_multi_markers(
481
+ record, [context_base["tuple_delimiter"]]
482
+ )
483
+
484
+ if_entities = await _handle_single_entity_extraction(
485
+ record_attributes, chunk_key
486
+ )
487
+ if if_entities is not None:
488
+ maybe_nodes[if_entities["entity_name"]].append(if_entities)
489
+ continue
490
+
491
+ if_relation = await _handle_single_relationship_extraction(
492
+ record_attributes, chunk_key
493
+ )
494
+ if if_relation is not None:
495
+ maybe_edges[(if_relation["src_id"], if_relation["tgt_id"])].append(
496
+ if_relation
497
+ )
498
+
499
+ return maybe_nodes, maybe_edges
500
+
501
  async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
502
+ """Process a single chunk
503
  Args:
504
  chunk_key_dp (tuple[str, TextChunkSchema]):
505
+ ("chunk-xxxxxx", {"tokens": int, "content": str, "full_doc_id": str, "chunk_order_index": int})
506
  """
507
  nonlocal processed_chunks
508
  chunk_key = chunk_key_dp[0]
509
  chunk_dp = chunk_key_dp[1]
510
  content = chunk_dp["content"]
511
+
512
+ # Get initial extraction
513
  hint_prompt = entity_extract_prompt.format(
514
  **context_base, input_text="{input_text}"
515
  ).format(**context_base, input_text=content)
516
 
517
  final_result = await _user_llm_func_with_cache(hint_prompt)
518
  history = pack_user_ass_to_openai_messages(hint_prompt, final_result)
519
+
520
+ # Process initial extraction
521
+ maybe_nodes, maybe_edges = await _process_extraction_result(
522
+ final_result, chunk_key
523
+ )
524
+
525
+ # Process additional gleaning results
526
  for now_glean_index in range(entity_extract_max_gleaning):
527
  glean_result = await _user_llm_func_with_cache(
528
  continue_prompt, history_messages=history
529
  )
530
 
531
  history += pack_user_ass_to_openai_messages(continue_prompt, glean_result)
532
+
533
+ # Process gleaning result separately
534
+ glean_nodes, glean_edges = await _process_extraction_result(
535
+ glean_result, chunk_key
536
+ )
537
+
538
+ # Merge results
539
+ for entity_name, entities in glean_nodes.items():
540
+ maybe_nodes[entity_name].extend(entities)
541
+ for edge_key, edges in glean_edges.items():
542
+ maybe_edges[edge_key].extend(edges)
543
+
544
  if now_glean_index == entity_extract_max_gleaning - 1:
545
  break
546
 
 
551
  if if_loop_result != "yes":
552
  break
553
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
554
  processed_chunks += 1
555
  entities_count = len(maybe_nodes)
556
  relations_count = len(maybe_edges)
 
962
  try:
963
  # Reduce top_k for vector search in hybrid mode since we have structured information from KG
964
  mix_topk = min(10, query_param.top_k)
965
+ # TODO: add ids to the query
966
+ results = await chunks_vdb.query(
967
+ augmented_query, top_k=mix_topk, ids=query_param.ids
968
+ )
969
  if not results:
970
  return None
971
 
 
1174
  logger.info(
1175
  f"Query nodes: {query}, top_k: {query_param.top_k}, cosine: {entities_vdb.cosine_better_than_threshold}"
1176
  )
1177
+
1178
+ results = await entities_vdb.query(
1179
+ query, top_k=query_param.top_k, ids=query_param.ids
1180
+ )
1181
+
1182
  if not len(results):
1183
  return "", "", ""
1184
  # get entity information
 
1431
  logger.info(
1432
  f"Query edges: {keywords}, top_k: {query_param.top_k}, cosine: {relationships_vdb.cosine_better_than_threshold}"
1433
  )
1434
+
1435
+ results = await relationships_vdb.query(
1436
+ keywords, top_k=query_param.top_k, ids=query_param.ids
1437
+ )
1438
 
1439
  if not len(results):
1440
  return "", "", ""
 
1683
  if cached_response is not None:
1684
  return cached_response
1685
 
1686
+ results = await chunks_vdb.query(
1687
+ query, top_k=query_param.top_k, ids=query_param.ids
1688
+ )
1689
  if not len(results):
1690
  return PROMPTS["fail_response"]
1691
 
 
1916
  )
1917
 
1918
  return response
1919
+
1920
+
1921
+ async def query_with_keywords(
1922
+ query: str,
1923
+ prompt: str,
1924
+ param: QueryParam,
1925
+ knowledge_graph_inst: BaseGraphStorage,
1926
+ entities_vdb: BaseVectorStorage,
1927
+ relationships_vdb: BaseVectorStorage,
1928
+ chunks_vdb: BaseVectorStorage,
1929
+ text_chunks_db: BaseKVStorage,
1930
+ global_config: dict[str, str],
1931
+ hashing_kv: BaseKVStorage | None = None,
1932
+ ) -> str | AsyncIterator[str]:
1933
+ """
1934
+ Extract keywords from the query and then use them for retrieving information.
1935
+
1936
+ 1. Extracts high-level and low-level keywords from the query
1937
+ 2. Formats the query with the extracted keywords and prompt
1938
+ 3. Uses the appropriate query method based on param.mode
1939
+
1940
+ Args:
1941
+ query: The user's query
1942
+ prompt: Additional prompt to prepend to the query
1943
+ param: Query parameters
1944
+ knowledge_graph_inst: Knowledge graph storage
1945
+ entities_vdb: Entities vector database
1946
+ relationships_vdb: Relationships vector database
1947
+ chunks_vdb: Document chunks vector database
1948
+ text_chunks_db: Text chunks storage
1949
+ global_config: Global configuration
1950
+ hashing_kv: Cache storage
1951
+
1952
+ Returns:
1953
+ Query response or async iterator
1954
+ """
1955
+ # Extract keywords
1956
+ hl_keywords, ll_keywords = await extract_keywords_only(
1957
+ text=query,
1958
+ param=param,
1959
+ global_config=global_config,
1960
+ hashing_kv=hashing_kv,
1961
+ )
1962
+
1963
+ param.hl_keywords = hl_keywords
1964
+ param.ll_keywords = ll_keywords
1965
+
1966
+ # Create a new string with the prompt and the keywords
1967
+ ll_keywords_str = ", ".join(ll_keywords)
1968
+ hl_keywords_str = ", ".join(hl_keywords)
1969
+ formatted_question = f"{prompt}\n\n### Keywords:\nHigh-level: {hl_keywords_str}\nLow-level: {ll_keywords_str}\n\n### Query:\n{query}"
1970
+
1971
+ # Use appropriate query method based on mode
1972
+ if param.mode in ["local", "global", "hybrid"]:
1973
+ return await kg_query_with_keywords(
1974
+ formatted_question,
1975
+ knowledge_graph_inst,
1976
+ entities_vdb,
1977
+ relationships_vdb,
1978
+ text_chunks_db,
1979
+ param,
1980
+ global_config,
1981
+ hashing_kv=hashing_kv,
1982
+ )
1983
+ elif param.mode == "naive":
1984
+ return await naive_query(
1985
+ formatted_question,
1986
+ chunks_vdb,
1987
+ text_chunks_db,
1988
+ param,
1989
+ global_config,
1990
+ hashing_kv=hashing_kv,
1991
+ )
1992
+ elif param.mode == "mix":
1993
+ return await mix_kg_vector_query(
1994
+ formatted_question,
1995
+ knowledge_graph_inst,
1996
+ entities_vdb,
1997
+ relationships_vdb,
1998
+ chunks_vdb,
1999
+ text_chunks_db,
2000
+ param,
2001
+ global_config,
2002
+ hashing_kv=hashing_kv,
2003
+ )
2004
+ else:
2005
+ raise ValueError(f"Unknown mode {param.mode}")
lightrag/prompt.py CHANGED
@@ -236,7 +236,7 @@ Given the query and conversation history, list both high-level and low-level key
236
  ---Instructions---
237
 
238
  - Consider both the current query and relevant conversation history when extracting keywords
239
- - Output the keywords in JSON format
240
  - The JSON should have two keys:
241
  - "high_level_keywords" for overarching concepts or themes
242
  - "low_level_keywords" for specific entities or details
 
236
  ---Instructions---
237
 
238
  - Consider both the current query and relevant conversation history when extracting keywords
239
+ - Output the keywords in JSON format, it will be parsed by a JSON parser, do not add any extra content in output
240
  - The JSON should have two keys:
241
  - "high_level_keywords" for overarching concepts or themes
242
  - "low_level_keywords" for specific entities or details
lightrag/utils.py CHANGED
@@ -890,3 +890,52 @@ def lazy_external_import(module_name: str, class_name: str) -> Callable[..., Any
890
  return cls(*args, **kwargs)
891
 
892
  return import_class
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
890
  return cls(*args, **kwargs)
891
 
892
  return import_class
893
+
894
+
895
+ def get_content_summary(content: str, max_length: int = 100) -> str:
896
+ """Get summary of document content
897
+
898
+ Args:
899
+ content: Original document content
900
+ max_length: Maximum length of summary
901
+
902
+ Returns:
903
+ Truncated content with ellipsis if needed
904
+ """
905
+ content = content.strip()
906
+ if len(content) <= max_length:
907
+ return content
908
+ return content[:max_length] + "..."
909
+
910
+
911
+ def clean_text(text: str) -> str:
912
+ """Clean text by removing null bytes (0x00) and whitespace
913
+
914
+ Args:
915
+ text: Input text to clean
916
+
917
+ Returns:
918
+ Cleaned text
919
+ """
920
+ return text.strip().replace("\x00", "")
921
+
922
+
923
+ def check_storage_env_vars(storage_name: str) -> None:
924
+ """Check if all required environment variables for storage implementation exist
925
+
926
+ Args:
927
+ storage_name: Storage implementation name
928
+
929
+ Raises:
930
+ ValueError: If required environment variables are missing
931
+ """
932
+ from lightrag.kg import STORAGE_ENV_REQUIREMENTS
933
+
934
+ required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, [])
935
+ missing_vars = [var for var in required_vars if var not in os.environ]
936
+
937
+ if missing_vars:
938
+ raise ValueError(
939
+ f"Storage implementation '{storage_name}' requires the following "
940
+ f"environment variables: {', '.join(missing_vars)}"
941
+ )