fix: ensure Milvus collections are loaded before operations
Browse files- Resolves "collection not loaded" MilvusException errors
- lightrag/kg/milvus_impl.py +44 -0
lightrag/kg/milvus_impl.py
CHANGED
@@ -539,6 +539,23 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|
539 |
)
|
540 |
raise
|
541 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
542 |
def _create_collection_if_not_exist(self):
|
543 |
"""Create collection if not exists and check existing collection compatibility"""
|
544 |
|
@@ -565,6 +582,8 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|
565 |
f"Collection '{self.namespace}' confirmed to exist, validating compatibility..."
|
566 |
)
|
567 |
self._validate_collection_compatibility()
|
|
|
|
|
568 |
return
|
569 |
except Exception as describe_error:
|
570 |
logger.warning(
|
@@ -587,6 +606,9 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|
587 |
# Then create indexes
|
588 |
self._create_indexes_after_collection()
|
589 |
|
|
|
|
|
|
|
590 |
logger.info(f"Successfully created Milvus collection: {self.namespace}")
|
591 |
|
592 |
except Exception as e:
|
@@ -615,6 +637,10 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|
615 |
collection_name=self.namespace, schema=schema
|
616 |
)
|
617 |
self._create_indexes_after_collection()
|
|
|
|
|
|
|
|
|
618 |
logger.info(f"Successfully force-created collection {self.namespace}")
|
619 |
|
620 |
except Exception as create_error:
|
@@ -670,6 +696,9 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|
670 |
if not data:
|
671 |
return
|
672 |
|
|
|
|
|
|
|
673 |
import time
|
674 |
|
675 |
current_time = int(time.time())
|
@@ -700,6 +729,9 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|
700 |
async def query(
|
701 |
self, query: str, top_k: int, ids: list[str] | None = None
|
702 |
) -> list[dict[str, Any]]:
|
|
|
|
|
|
|
703 |
embedding = await self.embedding_func(
|
704 |
[query], _priority=5
|
705 |
) # higher priority for query
|
@@ -764,6 +796,9 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|
764 |
entity_name: The name of the entity whose relations should be deleted
|
765 |
"""
|
766 |
try:
|
|
|
|
|
|
|
767 |
# Search for relations where entity is either source or target
|
768 |
expr = f'src_id == "{entity_name}" or tgt_id == "{entity_name}"'
|
769 |
|
@@ -802,6 +837,9 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|
802 |
ids: List of vector IDs to be deleted
|
803 |
"""
|
804 |
try:
|
|
|
|
|
|
|
805 |
# Delete vectors by IDs
|
806 |
result = self._client.delete(collection_name=self.namespace, pks=ids)
|
807 |
|
@@ -825,6 +863,9 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|
825 |
The vector data if found, or None if not found
|
826 |
"""
|
827 |
try:
|
|
|
|
|
|
|
828 |
# Include all meta_fields (created_at is now always included) plus id
|
829 |
output_fields = list(self.meta_fields) + ["id"]
|
830 |
|
@@ -856,6 +897,9 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|
856 |
return []
|
857 |
|
858 |
try:
|
|
|
|
|
|
|
859 |
# Include all meta_fields (created_at is now always included) plus id
|
860 |
output_fields = list(self.meta_fields) + ["id"]
|
861 |
|
|
|
539 |
)
|
540 |
raise
|
541 |
|
542 |
+
def _ensure_collection_loaded(self):
|
543 |
+
"""Ensure the collection is loaded into memory for search operations"""
|
544 |
+
try:
|
545 |
+
# Check if collection exists first
|
546 |
+
if not self._client.has_collection(self.namespace):
|
547 |
+
logger.error(f"Collection {self.namespace} does not exist")
|
548 |
+
raise ValueError(f"Collection {self.namespace} does not exist")
|
549 |
+
|
550 |
+
# Load the collection if it's not already loaded
|
551 |
+
# In Milvus, collections need to be loaded before they can be searched
|
552 |
+
self._client.load_collection(self.namespace)
|
553 |
+
logger.debug(f"Collection {self.namespace} loaded successfully")
|
554 |
+
|
555 |
+
except Exception as e:
|
556 |
+
logger.error(f"Failed to load collection {self.namespace}: {e}")
|
557 |
+
raise
|
558 |
+
|
559 |
def _create_collection_if_not_exist(self):
|
560 |
"""Create collection if not exists and check existing collection compatibility"""
|
561 |
|
|
|
582 |
f"Collection '{self.namespace}' confirmed to exist, validating compatibility..."
|
583 |
)
|
584 |
self._validate_collection_compatibility()
|
585 |
+
# Ensure the collection is loaded after validation
|
586 |
+
self._ensure_collection_loaded()
|
587 |
return
|
588 |
except Exception as describe_error:
|
589 |
logger.warning(
|
|
|
606 |
# Then create indexes
|
607 |
self._create_indexes_after_collection()
|
608 |
|
609 |
+
# Load the newly created collection
|
610 |
+
self._ensure_collection_loaded()
|
611 |
+
|
612 |
logger.info(f"Successfully created Milvus collection: {self.namespace}")
|
613 |
|
614 |
except Exception as e:
|
|
|
637 |
collection_name=self.namespace, schema=schema
|
638 |
)
|
639 |
self._create_indexes_after_collection()
|
640 |
+
|
641 |
+
# Load the newly created collection
|
642 |
+
self._ensure_collection_loaded()
|
643 |
+
|
644 |
logger.info(f"Successfully force-created collection {self.namespace}")
|
645 |
|
646 |
except Exception as create_error:
|
|
|
696 |
if not data:
|
697 |
return
|
698 |
|
699 |
+
# Ensure collection is loaded before upserting
|
700 |
+
self._ensure_collection_loaded()
|
701 |
+
|
702 |
import time
|
703 |
|
704 |
current_time = int(time.time())
|
|
|
729 |
async def query(
|
730 |
self, query: str, top_k: int, ids: list[str] | None = None
|
731 |
) -> list[dict[str, Any]]:
|
732 |
+
# Ensure collection is loaded before querying
|
733 |
+
self._ensure_collection_loaded()
|
734 |
+
|
735 |
embedding = await self.embedding_func(
|
736 |
[query], _priority=5
|
737 |
) # higher priority for query
|
|
|
796 |
entity_name: The name of the entity whose relations should be deleted
|
797 |
"""
|
798 |
try:
|
799 |
+
# Ensure collection is loaded before querying
|
800 |
+
self._ensure_collection_loaded()
|
801 |
+
|
802 |
# Search for relations where entity is either source or target
|
803 |
expr = f'src_id == "{entity_name}" or tgt_id == "{entity_name}"'
|
804 |
|
|
|
837 |
ids: List of vector IDs to be deleted
|
838 |
"""
|
839 |
try:
|
840 |
+
# Ensure collection is loaded before deleting
|
841 |
+
self._ensure_collection_loaded()
|
842 |
+
|
843 |
# Delete vectors by IDs
|
844 |
result = self._client.delete(collection_name=self.namespace, pks=ids)
|
845 |
|
|
|
863 |
The vector data if found, or None if not found
|
864 |
"""
|
865 |
try:
|
866 |
+
# Ensure collection is loaded before querying
|
867 |
+
self._ensure_collection_loaded()
|
868 |
+
|
869 |
# Include all meta_fields (created_at is now always included) plus id
|
870 |
output_fields = list(self.meta_fields) + ["id"]
|
871 |
|
|
|
897 |
return []
|
898 |
|
899 |
try:
|
900 |
+
# Ensure collection is loaded before querying
|
901 |
+
self._ensure_collection_loaded()
|
902 |
+
|
903 |
# Include all meta_fields (created_at is now always included) plus id
|
904 |
output_fields = list(self.meta_fields) + ["id"]
|
905 |
|