gzdaniel commited on
Commit
9e71115
·
1 Parent(s): 7c251fd

Feat: Add WORKSPACE support to all storage types

Browse files
env.example CHANGED
@@ -111,25 +111,37 @@ EMBEDDING_BINDING_HOST=http://localhost:11434
111
  ###########################
112
  ### Data storage selection
113
  ###########################
 
 
 
 
 
 
114
  ### PostgreSQL
115
  # LIGHTRAG_KV_STORAGE=PGKVStorage
116
  # LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage
117
- # LIGHTRAG_VECTOR_STORAGE=PGVectorStorage
118
  # LIGHTRAG_GRAPH_STORAGE=PGGraphStorage
119
- ### MongoDB
 
120
  # LIGHTRAG_KV_STORAGE=MongoKVStorage
121
  # LIGHTRAG_DOC_STATUS_STORAGE=MongoDocStatusStorage
122
- # LIGHTRAG_VECTOR_STORAGE=MongoVectorDBStorage
123
  # LIGHTRAG_GRAPH_STORAGE=MongoGraphStorage
124
- ### KV Storage
 
125
  # LIGHTRAG_KV_STORAGE=RedisKVStorage
126
  # LIGHTRAG_DOC_STATUS_STORAGE=RedisDocStatusStorage
127
- ### Vector Storage
128
- # LIGHTRAG_VECTOR_STORAGE=FaissVectorDBStorage
129
  # LIGHTRAG_VECTOR_STORAGE=MilvusVectorDBStorage
130
- ### Graph Storage
 
131
  # LIGHTRAG_GRAPH_STORAGE=Neo4JStorage
132
- # LIGHTRAG_GRAPH_STORAGE=MemgraphStorage
 
 
 
 
 
 
133
 
134
  ### PostgreSQL Configuration
135
  POSTGRES_HOST=localhost
@@ -138,31 +150,18 @@ POSTGRES_USER=your_username
138
  POSTGRES_PASSWORD='your_password'
139
  POSTGRES_DATABASE=your_database
140
  POSTGRES_MAX_CONNECTIONS=12
141
- ### separating all data from difference Lightrag instances
142
- # POSTGRES_WORKSPACE=default
143
 
144
  ### Neo4j Configuration
145
  NEO4J_URI=neo4j+s://xxxxxxxx.databases.neo4j.io
146
  NEO4J_USERNAME=neo4j
147
  NEO4J_PASSWORD='your_password'
148
 
149
- ### Independent AGM Configuration(not for AMG embedded in PostreSQL)
150
- # AGE_POSTGRES_DB=
151
- # AGE_POSTGRES_USER=
152
- # AGE_POSTGRES_PASSWORD=
153
- # AGE_POSTGRES_HOST=
154
- # AGE_POSTGRES_PORT=8529
155
-
156
- # AGE Graph Name(apply to PostgreSQL and independent AGM)
157
- ### AGE_GRAPH_NAME is deprecated
158
- # AGE_GRAPH_NAME=lightrag
159
-
160
  ### MongoDB Configuration
161
  MONGO_URI=mongodb://root:root@localhost:27017/
 
162
  MONGO_DATABASE=LightRAG
163
- ### separating all data from difference Lightrag instances(deprecating)
164
- ### separating all data from difference Lightrag instances
165
- # MONGODB_WORKSPACE=default
166
 
167
  ### Milvus Configuration
168
  MILVUS_URI=http://localhost:19530
@@ -170,10 +169,13 @@ MILVUS_DB_NAME=lightrag
170
  # MILVUS_USER=root
171
  # MILVUS_PASSWORD=your_password
172
  # MILVUS_TOKEN=your_token
 
173
 
174
  ### Qdrant
175
- QDRANT_URL=http://localhost:16333
176
  # QDRANT_API_KEY=your-api-key
 
177
 
178
  ### Redis
179
  REDIS_URI=redis://localhost:6379
 
 
111
  ###########################
112
  ### Data storage selection
113
  ###########################
114
+ ### In-memory database with data persistence to local files
115
+ # LIGHTRAG_KV_STORAGE=JsonKVStorage
116
+ # LIGHTRAG_DOC_STATUS_STORAGE=JsonDocStatusStorage
117
+ # LIGHTRAG_GRAPH_STORAGE=NetworkXStorage
118
+ # LIGHTRAG_VECTOR_STORAGE=NanoVectorDBStorage
119
+ # LIGHTRAG_VECTOR_STORAGE=FaissVectorDBStorage
120
  ### PostgreSQL
121
  # LIGHTRAG_KV_STORAGE=PGKVStorage
122
  # LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage
 
123
  # LIGHTRAG_GRAPH_STORAGE=PGGraphStorage
124
+ # LIGHTRAG_VECTOR_STORAGE=PGVectorStorage
125
+ ### MongoDB (recommended for production deploy)
126
  # LIGHTRAG_KV_STORAGE=MongoKVStorage
127
  # LIGHTRAG_DOC_STATUS_STORAGE=MongoDocStatusStorage
 
128
  # LIGHTRAG_GRAPH_STORAGE=MongoGraphStorage
129
+ # LIGHTRAG_VECTOR_STORAGE=MongoVectorDBStorage
130
+ ### Redis Storage (recommended for production deploy)
131
  # LIGHTRAG_KV_STORAGE=RedisKVStorage
132
  # LIGHTRAG_DOC_STATUS_STORAGE=RedisDocStatusStorage
133
+ ### Vector Storage (recommended for production deploy)
 
134
  # LIGHTRAG_VECTOR_STORAGE=MilvusVectorDBStorage
135
+ # LIGHTRAG_VECTOR_STORAGE=QdrantVectorDBStorage
136
+ ### Graph Storage (recommended for production deploy)
137
  # LIGHTRAG_GRAPH_STORAGE=Neo4JStorage
138
+
139
+ ####################################################################
140
+ ### Default workspace for all storage types
141
+ ### For the purpose of isolation of data for each LightRAG instance
142
+ ### Valid characters: a-z, A-Z, 0-9, and _
143
+ ####################################################################
144
+ # WORKSPACE=doc—
145
 
146
  ### PostgreSQL Configuration
147
  POSTGRES_HOST=localhost
 
150
  POSTGRES_PASSWORD='your_password'
151
  POSTGRES_DATABASE=your_database
152
  POSTGRES_MAX_CONNECTIONS=12
153
+ # POSTGRES_WORKSPACE=forced_workspace_name
 
154
 
155
  ### Neo4j Configuration
156
  NEO4J_URI=neo4j+s://xxxxxxxx.databases.neo4j.io
157
  NEO4J_USERNAME=neo4j
158
  NEO4J_PASSWORD='your_password'
159
 
 
 
 
 
 
 
 
 
 
 
 
160
  ### MongoDB Configuration
161
  MONGO_URI=mongodb://root:root@localhost:27017/
162
+ #MONGO_URI=mongodb+srv://root:[email protected]/?retryWrites=true&w=majority&appName=Cluster0
163
  MONGO_DATABASE=LightRAG
164
+ # MONGODB_WORKSPACE=forced_workspace_name
 
 
165
 
166
  ### Milvus Configuration
167
  MILVUS_URI=http://localhost:19530
 
169
  # MILVUS_USER=root
170
  # MILVUS_PASSWORD=your_password
171
  # MILVUS_TOKEN=your_token
172
+ # MILVUS_WORKSPACE=forced_workspace_name
173
 
174
  ### Qdrant
175
+ QDRANT_URL=http://localhost:6333
176
  # QDRANT_API_KEY=your-api-key
177
+ # QDRANT_WORKSPACE=forced_workspace_name
178
 
179
  ### Redis
180
  REDIS_URI=redis://localhost:6379
181
+ # REDIS_WORKSPACE=forced_workspace_name
lightrag/api/config.py CHANGED
@@ -184,10 +184,10 @@ def parse_args() -> argparse.Namespace:
184
 
185
  # Namespace
186
  parser.add_argument(
187
- "--namespace-prefix",
188
  type=str,
189
- default=get_env_value("NAMESPACE_PREFIX", ""),
190
- help="Prefix of the namespace",
191
  )
192
 
193
  parser.add_argument(
 
184
 
185
  # Namespace
186
  parser.add_argument(
187
+ "--workspace",
188
  type=str,
189
+ default=get_env_value("WORKSPACE", ""),
190
+ help="Default workspace for all storage",
191
  )
192
 
193
  parser.add_argument(
lightrag/api/lightrag_server.py CHANGED
@@ -112,8 +112,8 @@ def create_app(args):
112
  # Check if API key is provided either through env var or args
113
  api_key = os.getenv("LIGHTRAG_API_KEY") or args.key
114
 
115
- # Initialize document manager
116
- doc_manager = DocumentManager(args.input_dir)
117
 
118
  @asynccontextmanager
119
  async def lifespan(app: FastAPI):
@@ -295,6 +295,7 @@ def create_app(args):
295
  if args.llm_binding in ["lollms", "ollama", "openai"]:
296
  rag = LightRAG(
297
  working_dir=args.working_dir,
 
298
  llm_model_func=lollms_model_complete
299
  if args.llm_binding == "lollms"
300
  else ollama_model_complete
@@ -330,6 +331,7 @@ def create_app(args):
330
  else: # azure_openai
331
  rag = LightRAG(
332
  working_dir=args.working_dir,
 
333
  llm_model_func=azure_openai_model_complete,
334
  chunk_token_size=int(args.chunk_size),
335
  chunk_overlap_token_size=int(args.chunk_overlap_size),
 
112
  # Check if API key is provided either through env var or args
113
  api_key = os.getenv("LIGHTRAG_API_KEY") or args.key
114
 
115
+ # Initialize document manager with workspace support for data isolation
116
+ doc_manager = DocumentManager(args.input_dir, workspace=args.workspace)
117
 
118
  @asynccontextmanager
119
  async def lifespan(app: FastAPI):
 
295
  if args.llm_binding in ["lollms", "ollama", "openai"]:
296
  rag = LightRAG(
297
  working_dir=args.working_dir,
298
+ workspace=args.workspace,
299
  llm_model_func=lollms_model_complete
300
  if args.llm_binding == "lollms"
301
  else ollama_model_complete
 
331
  else: # azure_openai
332
  rag = LightRAG(
333
  working_dir=args.working_dir,
334
+ workspace=args.workspace,
335
  llm_model_func=azure_openai_model_complete,
336
  chunk_token_size=int(args.chunk_size),
337
  chunk_overlap_token_size=int(args.chunk_overlap_size),
lightrag/api/routers/document_routes.py CHANGED
@@ -475,6 +475,7 @@ class DocumentManager:
475
  def __init__(
476
  self,
477
  input_dir: str,
 
478
  supported_extensions: tuple = (
479
  ".txt",
480
  ".md",
@@ -515,10 +516,19 @@ class DocumentManager:
515
  ".less", # LESS CSS
516
  ),
517
  ):
518
- self.input_dir = Path(input_dir)
 
 
519
  self.supported_extensions = supported_extensions
520
  self.indexed_files = set()
521
 
 
 
 
 
 
 
 
522
  # Create input directory if it doesn't exist
523
  self.input_dir.mkdir(parents=True, exist_ok=True)
524
 
@@ -716,7 +726,9 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
716
  if content:
717
  # Check if content contains only whitespace characters
718
  if not content.strip():
719
- logger.warning(f"File contains only whitespace characters. file_paths={file_path.name}")
 
 
720
 
721
  await rag.apipeline_enqueue_documents(content, file_paths=file_path.name)
722
  logger.info(f"Successfully fetched and enqueued file: {file_path.name}")
 
475
  def __init__(
476
  self,
477
  input_dir: str,
478
+ workspace: str = "", # New parameter for workspace isolation
479
  supported_extensions: tuple = (
480
  ".txt",
481
  ".md",
 
516
  ".less", # LESS CSS
517
  ),
518
  ):
519
+ # Store the base input directory and workspace
520
+ self.base_input_dir = Path(input_dir)
521
+ self.workspace = workspace
522
  self.supported_extensions = supported_extensions
523
  self.indexed_files = set()
524
 
525
+ # Create workspace-specific input directory
526
+ # If workspace is provided, create a subdirectory for data isolation
527
+ if workspace:
528
+ self.input_dir = self.base_input_dir / workspace
529
+ else:
530
+ self.input_dir = self.base_input_dir
531
+
532
  # Create input directory if it doesn't exist
533
  self.input_dir.mkdir(parents=True, exist_ok=True)
534
 
 
726
  if content:
727
  # Check if content contains only whitespace characters
728
  if not content.strip():
729
+ logger.warning(
730
+ f"File contains only whitespace characters. file_paths={file_path.name}"
731
+ )
732
 
733
  await rag.apipeline_enqueue_documents(content, file_paths=file_path.name)
734
  logger.info(f"Successfully fetched and enqueued file: {file_path.name}")
lightrag/base.py CHANGED
@@ -103,6 +103,7 @@ class QueryParam:
103
  @dataclass
104
  class StorageNameSpace(ABC):
105
  namespace: str
 
106
  global_config: dict[str, Any]
107
 
108
  async def initialize(self):
 
103
  @dataclass
104
  class StorageNameSpace(ABC):
105
  namespace: str
106
+ workspace: str
107
  global_config: dict[str, Any]
108
 
109
  async def initialize(self):
lightrag/kg/faiss_impl.py CHANGED
@@ -38,9 +38,19 @@ class FaissVectorDBStorage(BaseVectorStorage):
38
  self.cosine_better_than_threshold = cosine_threshold
39
 
40
  # Where to save index file if you want persistent storage
41
- self._faiss_index_file = os.path.join(
42
- self.global_config["working_dir"], f"faiss_index_{self.namespace}.index"
43
- )
 
 
 
 
 
 
 
 
 
 
44
  self._meta_file = self._faiss_index_file + ".meta.json"
45
 
46
  self._max_batch_size = self.global_config["embedding_batch_num"]
 
38
  self.cosine_better_than_threshold = cosine_threshold
39
 
40
  # Where to save index file if you want persistent storage
41
+ working_dir = self.global_config["working_dir"]
42
+ if self.workspace:
43
+ # Include workspace in the file path for data isolation
44
+ workspace_dir = os.path.join(working_dir, self.workspace)
45
+ os.makedirs(workspace_dir, exist_ok=True)
46
+ self._faiss_index_file = os.path.join(
47
+ workspace_dir, f"faiss_index_{self.namespace}.index"
48
+ )
49
+ else:
50
+ # Default behavior when workspace is empty
51
+ self._faiss_index_file = os.path.join(
52
+ working_dir, f"faiss_index_{self.namespace}.index"
53
+ )
54
  self._meta_file = self._faiss_index_file + ".meta.json"
55
 
56
  self._max_batch_size = self.global_config["embedding_batch_num"]
lightrag/kg/json_doc_status_impl.py CHANGED
@@ -30,7 +30,18 @@ class JsonDocStatusStorage(DocStatusStorage):
30
 
31
  def __post_init__(self):
32
  working_dir = self.global_config["working_dir"]
33
- self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
 
 
 
 
 
 
 
 
 
 
 
34
  self._data = None
35
  self._storage_lock = None
36
  self.storage_updated = None
 
30
 
31
  def __post_init__(self):
32
  working_dir = self.global_config["working_dir"]
33
+ if self.workspace:
34
+ # Include workspace in the file path for data isolation
35
+ workspace_dir = os.path.join(working_dir, self.workspace)
36
+ os.makedirs(workspace_dir, exist_ok=True)
37
+ self._file_name = os.path.join(
38
+ workspace_dir, f"kv_store_{self.namespace}.json"
39
+ )
40
+ else:
41
+ # Default behavior when workspace is empty
42
+ self._file_name = os.path.join(
43
+ working_dir, f"kv_store_{self.namespace}.json"
44
+ )
45
  self._data = None
46
  self._storage_lock = None
47
  self.storage_updated = None
lightrag/kg/json_kv_impl.py CHANGED
@@ -26,7 +26,18 @@ from .shared_storage import (
26
  class JsonKVStorage(BaseKVStorage):
27
  def __post_init__(self):
28
  working_dir = self.global_config["working_dir"]
29
- self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
 
 
 
 
 
 
 
 
 
 
 
30
  self._data = None
31
  self._storage_lock = None
32
  self.storage_updated = None
 
26
  class JsonKVStorage(BaseKVStorage):
27
  def __post_init__(self):
28
  working_dir = self.global_config["working_dir"]
29
+ if self.workspace:
30
+ # Include workspace in the file path for data isolation
31
+ workspace_dir = os.path.join(working_dir, self.workspace)
32
+ os.makedirs(workspace_dir, exist_ok=True)
33
+ self._file_name = os.path.join(
34
+ workspace_dir, f"kv_store_{self.namespace}.json"
35
+ )
36
+ else:
37
+ # Default behavior when workspace is empty
38
+ self._file_name = os.path.join(
39
+ working_dir, f"kv_store_{self.namespace}.json"
40
+ )
41
  self._data = None
42
  self._storage_lock = None
43
  self.storage_updated = None
lightrag/kg/milvus_impl.py CHANGED
@@ -7,10 +7,6 @@ from lightrag.utils import logger, compute_mdhash_id
7
  from ..base import BaseVectorStorage
8
  import pipmaster as pm
9
 
10
-
11
- if not pm.is_installed("configparser"):
12
- pm.install("configparser")
13
-
14
  if not pm.is_installed("pymilvus"):
15
  pm.install("pymilvus")
16
 
@@ -660,6 +656,29 @@ class MilvusVectorDBStorage(BaseVectorStorage):
660
  raise
661
 
662
  def __post_init__(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
663
  kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
664
  cosine_threshold = kwargs.get("cosine_better_than_threshold")
665
  if cosine_threshold is None:
 
7
  from ..base import BaseVectorStorage
8
  import pipmaster as pm
9
 
 
 
 
 
10
  if not pm.is_installed("pymilvus"):
11
  pm.install("pymilvus")
12
 
 
656
  raise
657
 
658
  def __post_init__(self):
659
+ # Check for MILVUS_WORKSPACE environment variable first (higher priority)
660
+ # This allows administrators to force a specific workspace for all Milvus storage instances
661
+ milvus_workspace = os.environ.get("MILVUS_WORKSPACE")
662
+ if milvus_workspace and milvus_workspace.strip():
663
+ # Use environment variable value, overriding the passed workspace parameter
664
+ effective_workspace = milvus_workspace.strip()
665
+ logger.info(
666
+ f"Using MILVUS_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
667
+ )
668
+ else:
669
+ # Use the workspace parameter passed during initialization
670
+ effective_workspace = self.workspace
671
+ if effective_workspace:
672
+ logger.debug(
673
+ f"Using passed workspace parameter: '{effective_workspace}'"
674
+ )
675
+
676
+ # Build namespace with workspace prefix for data isolation
677
+ if effective_workspace:
678
+ self.namespace = f"{effective_workspace}_{self.namespace}"
679
+ logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
680
+ # When workspace is empty, keep the original namespace unchanged
681
+
682
  kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
683
  cosine_threshold = kwargs.get("cosine_better_than_threshold")
684
  if cosine_threshold is None:
lightrag/kg/mongo_impl.py CHANGED
@@ -25,6 +25,7 @@ if not pm.is_installed("pymongo"):
25
  pm.install("pymongo")
26
 
27
  from pymongo import AsyncMongoClient # type: ignore
 
28
  from pymongo.asynchronous.database import AsyncDatabase # type: ignore
29
  from pymongo.asynchronous.collection import AsyncCollection # type: ignore
30
  from pymongo.operations import SearchIndexModel # type: ignore
@@ -81,7 +82,39 @@ class MongoKVStorage(BaseKVStorage):
81
  db: AsyncDatabase = field(default=None)
82
  _data: AsyncCollection = field(default=None)
83
 
 
 
 
 
 
 
 
 
 
84
  def __post_init__(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85
  self._collection_name = self.namespace
86
 
87
  async def initialize(self):
@@ -142,7 +175,6 @@ class MongoKVStorage(BaseKVStorage):
142
 
143
  # Unified handling for all namespaces with flattened keys
144
  # Use bulk_write for better performance
145
- from pymongo import UpdateOne
146
 
147
  operations = []
148
  current_time = int(time.time()) # Get current Unix timestamp
@@ -252,7 +284,39 @@ class MongoDocStatusStorage(DocStatusStorage):
252
  db: AsyncDatabase = field(default=None)
253
  _data: AsyncCollection = field(default=None)
254
 
 
 
 
 
 
 
 
 
 
255
  def __post_init__(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
256
  self._collection_name = self.namespace
257
 
258
  async def initialize(self):
@@ -367,12 +431,36 @@ class MongoGraphStorage(BaseGraphStorage):
367
  # edge collection storing source_node_id, target_node_id, and edge_properties
368
  edgeCollection: AsyncCollection = field(default=None)
369
 
370
- def __init__(self, namespace, global_config, embedding_func):
371
  super().__init__(
372
  namespace=namespace,
 
373
  global_config=global_config,
374
  embedding_func=embedding_func,
375
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
376
  self._collection_name = self.namespace
377
  self._edge_collection_name = f"{self._collection_name}_edges"
378
 
@@ -1231,7 +1319,42 @@ class MongoVectorDBStorage(BaseVectorStorage):
1231
  db: AsyncDatabase | None = field(default=None)
1232
  _data: AsyncCollection | None = field(default=None)
1233
 
 
 
 
 
 
 
 
 
 
 
 
 
1234
  def __post_init__(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1235
  kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
1236
  cosine_threshold = kwargs.get("cosine_better_than_threshold")
1237
  if cosine_threshold is None:
 
25
  pm.install("pymongo")
26
 
27
  from pymongo import AsyncMongoClient # type: ignore
28
+ from pymongo import UpdateOne # type: ignore
29
  from pymongo.asynchronous.database import AsyncDatabase # type: ignore
30
  from pymongo.asynchronous.collection import AsyncCollection # type: ignore
31
  from pymongo.operations import SearchIndexModel # type: ignore
 
82
  db: AsyncDatabase = field(default=None)
83
  _data: AsyncCollection = field(default=None)
84
 
85
+ def __init__(self, namespace, global_config, embedding_func, workspace=None):
86
+ super().__init__(
87
+ namespace=namespace,
88
+ workspace=workspace or "",
89
+ global_config=global_config,
90
+ embedding_func=embedding_func,
91
+ )
92
+ self.__post_init__()
93
+
94
  def __post_init__(self):
95
+ # Check for MONGODB_WORKSPACE environment variable first (higher priority)
96
+ # This allows administrators to force a specific workspace for all MongoDB storage instances
97
+ mongodb_workspace = os.environ.get("MONGODB_WORKSPACE")
98
+ if mongodb_workspace and mongodb_workspace.strip():
99
+ # Use environment variable value, overriding the passed workspace parameter
100
+ effective_workspace = mongodb_workspace.strip()
101
+ logger.info(
102
+ f"Using MONGODB_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
103
+ )
104
+ else:
105
+ # Use the workspace parameter passed during initialization
106
+ effective_workspace = self.workspace
107
+ if effective_workspace:
108
+ logger.debug(
109
+ f"Using passed workspace parameter: '{effective_workspace}'"
110
+ )
111
+
112
+ # Build namespace with workspace prefix for data isolation
113
+ if effective_workspace:
114
+ self.namespace = f"{effective_workspace}_{self.namespace}"
115
+ logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
116
+ # When workspace is empty, keep the original namespace unchanged
117
+
118
  self._collection_name = self.namespace
119
 
120
  async def initialize(self):
 
175
 
176
  # Unified handling for all namespaces with flattened keys
177
  # Use bulk_write for better performance
 
178
 
179
  operations = []
180
  current_time = int(time.time()) # Get current Unix timestamp
 
284
  db: AsyncDatabase = field(default=None)
285
  _data: AsyncCollection = field(default=None)
286
 
287
+ def __init__(self, namespace, global_config, embedding_func, workspace=None):
288
+ super().__init__(
289
+ namespace=namespace,
290
+ workspace=workspace or "",
291
+ global_config=global_config,
292
+ embedding_func=embedding_func,
293
+ )
294
+ self.__post_init__()
295
+
296
  def __post_init__(self):
297
+ # Check for MONGODB_WORKSPACE environment variable first (higher priority)
298
+ # This allows administrators to force a specific workspace for all MongoDB storage instances
299
+ mongodb_workspace = os.environ.get("MONGODB_WORKSPACE")
300
+ if mongodb_workspace and mongodb_workspace.strip():
301
+ # Use environment variable value, overriding the passed workspace parameter
302
+ effective_workspace = mongodb_workspace.strip()
303
+ logger.info(
304
+ f"Using MONGODB_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
305
+ )
306
+ else:
307
+ # Use the workspace parameter passed during initialization
308
+ effective_workspace = self.workspace
309
+ if effective_workspace:
310
+ logger.debug(
311
+ f"Using passed workspace parameter: '{effective_workspace}'"
312
+ )
313
+
314
+ # Build namespace with workspace prefix for data isolation
315
+ if effective_workspace:
316
+ self.namespace = f"{effective_workspace}_{self.namespace}"
317
+ logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
318
+ # When workspace is empty, keep the original namespace unchanged
319
+
320
  self._collection_name = self.namespace
321
 
322
  async def initialize(self):
 
431
  # edge collection storing source_node_id, target_node_id, and edge_properties
432
  edgeCollection: AsyncCollection = field(default=None)
433
 
434
+ def __init__(self, namespace, global_config, embedding_func, workspace=None):
435
  super().__init__(
436
  namespace=namespace,
437
+ workspace=workspace or "",
438
  global_config=global_config,
439
  embedding_func=embedding_func,
440
  )
441
+ # Check for MONGODB_WORKSPACE environment variable first (higher priority)
442
+ # This allows administrators to force a specific workspace for all MongoDB storage instances
443
+ mongodb_workspace = os.environ.get("MONGODB_WORKSPACE")
444
+ if mongodb_workspace and mongodb_workspace.strip():
445
+ # Use environment variable value, overriding the passed workspace parameter
446
+ effective_workspace = mongodb_workspace.strip()
447
+ logger.info(
448
+ f"Using MONGODB_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
449
+ )
450
+ else:
451
+ # Use the workspace parameter passed during initialization
452
+ effective_workspace = self.workspace
453
+ if effective_workspace:
454
+ logger.debug(
455
+ f"Using passed workspace parameter: '{effective_workspace}'"
456
+ )
457
+
458
+ # Build namespace with workspace prefix for data isolation
459
+ if effective_workspace:
460
+ self.namespace = f"{effective_workspace}_{self.namespace}"
461
+ logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
462
+ # When workspace is empty, keep the original namespace unchanged
463
+
464
  self._collection_name = self.namespace
465
  self._edge_collection_name = f"{self._collection_name}_edges"
466
 
 
1319
  db: AsyncDatabase | None = field(default=None)
1320
  _data: AsyncCollection | None = field(default=None)
1321
 
1322
+ def __init__(
1323
+ self, namespace, global_config, embedding_func, workspace=None, meta_fields=None
1324
+ ):
1325
+ super().__init__(
1326
+ namespace=namespace,
1327
+ workspace=workspace or "",
1328
+ global_config=global_config,
1329
+ embedding_func=embedding_func,
1330
+ meta_fields=meta_fields or set(),
1331
+ )
1332
+ self.__post_init__()
1333
+
1334
  def __post_init__(self):
1335
+ # Check for MONGODB_WORKSPACE environment variable first (higher priority)
1336
+ # This allows administrators to force a specific workspace for all MongoDB storage instances
1337
+ mongodb_workspace = os.environ.get("MONGODB_WORKSPACE")
1338
+ if mongodb_workspace and mongodb_workspace.strip():
1339
+ # Use environment variable value, overriding the passed workspace parameter
1340
+ effective_workspace = mongodb_workspace.strip()
1341
+ logger.info(
1342
+ f"Using MONGODB_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
1343
+ )
1344
+ else:
1345
+ # Use the workspace parameter passed during initialization
1346
+ effective_workspace = self.workspace
1347
+ if effective_workspace:
1348
+ logger.debug(
1349
+ f"Using passed workspace parameter: '{effective_workspace}'"
1350
+ )
1351
+
1352
+ # Build namespace with workspace prefix for data isolation
1353
+ if effective_workspace:
1354
+ self.namespace = f"{effective_workspace}_{self.namespace}"
1355
+ logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
1356
+ # When workspace is empty, keep the original namespace unchanged
1357
+
1358
  kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
1359
  cosine_threshold = kwargs.get("cosine_better_than_threshold")
1360
  if cosine_threshold is None:
lightrag/kg/nano_vector_db_impl.py CHANGED
@@ -41,9 +41,19 @@ class NanoVectorDBStorage(BaseVectorStorage):
41
  )
42
  self.cosine_better_than_threshold = cosine_threshold
43
 
44
- self._client_file_name = os.path.join(
45
- self.global_config["working_dir"], f"vdb_{self.namespace}.json"
46
- )
 
 
 
 
 
 
 
 
 
 
47
  self._max_batch_size = self.global_config["embedding_batch_num"]
48
 
49
  self._client = NanoVectorDB(
 
41
  )
42
  self.cosine_better_than_threshold = cosine_threshold
43
 
44
+ working_dir = self.global_config["working_dir"]
45
+ if self.workspace:
46
+ # Include workspace in the file path for data isolation
47
+ workspace_dir = os.path.join(working_dir, self.workspace)
48
+ os.makedirs(workspace_dir, exist_ok=True)
49
+ self._client_file_name = os.path.join(
50
+ workspace_dir, f"vdb_{self.namespace}.json"
51
+ )
52
+ else:
53
+ # Default behavior when workspace is empty
54
+ self._client_file_name = os.path.join(
55
+ working_dir, f"vdb_{self.namespace}.json"
56
+ )
57
  self._max_batch_size = self.global_config["embedding_batch_num"]
58
 
59
  self._client = NanoVectorDB(
lightrag/kg/neo4j_impl.py CHANGED
@@ -50,14 +50,20 @@ logging.getLogger("neo4j").setLevel(logging.ERROR)
50
  @final
51
  @dataclass
52
  class Neo4JStorage(BaseGraphStorage):
53
- def __init__(self, namespace, global_config, embedding_func):
54
  super().__init__(
55
  namespace=namespace,
 
56
  global_config=global_config,
57
  embedding_func=embedding_func,
58
  )
59
  self._driver = None
60
 
 
 
 
 
 
61
  async def initialize(self):
62
  URI = os.environ.get("NEO4J_URI", config.get("neo4j", "uri", fallback=None))
63
  USERNAME = os.environ.get(
@@ -153,13 +159,14 @@ class Neo4JStorage(BaseGraphStorage):
153
  raise e
154
 
155
  if connected:
156
- # Create index for base nodes on entity_id if it doesn't exist
 
157
  try:
158
  async with self._driver.session(database=database) as session:
159
  # Check if index exists first
160
- check_query = """
161
  CALL db.indexes() YIELD name, labelsOrTypes, properties
162
- WHERE labelsOrTypes = ['base'] AND properties = ['entity_id']
163
  RETURN count(*) > 0 AS exists
164
  """
165
  try:
@@ -172,16 +179,16 @@ class Neo4JStorage(BaseGraphStorage):
172
  if not index_exists:
173
  # Create index only if it doesn't exist
174
  result = await session.run(
175
- "CREATE INDEX FOR (n:base) ON (n.entity_id)"
176
  )
177
  await result.consume()
178
  logger.info(
179
- f"Created index for base nodes on entity_id in {database}"
180
  )
181
  except Exception:
182
  # Fallback if db.indexes() is not supported in this Neo4j version
183
  result = await session.run(
184
- "CREATE INDEX IF NOT EXISTS FOR (n:base) ON (n.entity_id)"
185
  )
186
  await result.consume()
187
  except Exception as e:
@@ -216,11 +223,12 @@ class Neo4JStorage(BaseGraphStorage):
216
  ValueError: If node_id is invalid
217
  Exception: If there is an error executing the query
218
  """
 
219
  async with self._driver.session(
220
  database=self._DATABASE, default_access_mode="READ"
221
  ) as session:
222
  try:
223
- query = "MATCH (n:base {entity_id: $entity_id}) RETURN count(n) > 0 AS node_exists"
224
  result = await session.run(query, entity_id=node_id)
225
  single_result = await result.single()
226
  await result.consume() # Ensure result is fully consumed
@@ -245,12 +253,13 @@ class Neo4JStorage(BaseGraphStorage):
245
  ValueError: If either node_id is invalid
246
  Exception: If there is an error executing the query
247
  """
 
248
  async with self._driver.session(
249
  database=self._DATABASE, default_access_mode="READ"
250
  ) as session:
251
  try:
252
  query = (
253
- "MATCH (a:base {entity_id: $source_entity_id})-[r]-(b:base {entity_id: $target_entity_id}) "
254
  "RETURN COUNT(r) > 0 AS edgeExists"
255
  )
256
  result = await session.run(
@@ -282,11 +291,14 @@ class Neo4JStorage(BaseGraphStorage):
282
  ValueError: If node_id is invalid
283
  Exception: If there is an error executing the query
284
  """
 
285
  async with self._driver.session(
286
  database=self._DATABASE, default_access_mode="READ"
287
  ) as session:
288
  try:
289
- query = "MATCH (n:base {entity_id: $entity_id}) RETURN n"
 
 
290
  result = await session.run(query, entity_id=node_id)
291
  try:
292
  records = await result.fetch(
@@ -300,12 +312,12 @@ class Neo4JStorage(BaseGraphStorage):
300
  if records:
301
  node = records[0]["n"]
302
  node_dict = dict(node)
303
- # Remove base label from labels list if it exists
304
  if "labels" in node_dict:
305
  node_dict["labels"] = [
306
  label
307
  for label in node_dict["labels"]
308
- if label != "base"
309
  ]
310
  # logger.debug(f"Neo4j query node {query} return: {node_dict}")
311
  return node_dict
@@ -326,12 +338,13 @@ class Neo4JStorage(BaseGraphStorage):
326
  Returns:
327
  A dictionary mapping each node_id to its node data (or None if not found).
328
  """
 
329
  async with self._driver.session(
330
  database=self._DATABASE, default_access_mode="READ"
331
  ) as session:
332
- query = """
333
  UNWIND $node_ids AS id
334
- MATCH (n:base {entity_id: id})
335
  RETURN n.entity_id AS entity_id, n
336
  """
337
  result = await session.run(query, node_ids=node_ids)
@@ -340,10 +353,12 @@ class Neo4JStorage(BaseGraphStorage):
340
  entity_id = record["entity_id"]
341
  node = record["n"]
342
  node_dict = dict(node)
343
- # Remove the 'base' label if present in a 'labels' property
344
  if "labels" in node_dict:
345
  node_dict["labels"] = [
346
- label for label in node_dict["labels"] if label != "base"
 
 
347
  ]
348
  nodes[entity_id] = node_dict
349
  await result.consume() # Make sure to consume the result fully
@@ -364,12 +379,13 @@ class Neo4JStorage(BaseGraphStorage):
364
  ValueError: If node_id is invalid
365
  Exception: If there is an error executing the query
366
  """
 
367
  async with self._driver.session(
368
  database=self._DATABASE, default_access_mode="READ"
369
  ) as session:
370
  try:
371
- query = """
372
- MATCH (n:base {entity_id: $entity_id})
373
  OPTIONAL MATCH (n)-[r]-()
374
  RETURN COUNT(r) AS degree
375
  """
@@ -403,13 +419,14 @@ class Neo4JStorage(BaseGraphStorage):
403
  A dictionary mapping each node_id to its degree (number of relationships).
404
  If a node is not found, its degree will be set to 0.
405
  """
 
406
  async with self._driver.session(
407
  database=self._DATABASE, default_access_mode="READ"
408
  ) as session:
409
- query = """
410
  UNWIND $node_ids AS id
411
- MATCH (n:base {entity_id: id})
412
- RETURN n.entity_id AS entity_id, count { (n)--() } AS degree;
413
  """
414
  result = await session.run(query, node_ids=node_ids)
415
  degrees = {}
@@ -489,12 +506,13 @@ class Neo4JStorage(BaseGraphStorage):
489
  ValueError: If either node_id is invalid
490
  Exception: If there is an error executing the query
491
  """
 
492
  try:
493
  async with self._driver.session(
494
  database=self._DATABASE, default_access_mode="READ"
495
  ) as session:
496
- query = """
497
- MATCH (start:base {entity_id: $source_entity_id})-[r]-(end:base {entity_id: $target_entity_id})
498
  RETURN properties(r) as edge_properties
499
  """
500
  result = await session.run(
@@ -571,12 +589,13 @@ class Neo4JStorage(BaseGraphStorage):
571
  Returns:
572
  A dictionary mapping (src, tgt) tuples to their edge properties.
573
  """
 
574
  async with self._driver.session(
575
  database=self._DATABASE, default_access_mode="READ"
576
  ) as session:
577
- query = """
578
  UNWIND $pairs AS pair
579
- MATCH (start:base {entity_id: pair.src})-[r:DIRECTED]-(end:base {entity_id: pair.tgt})
580
  RETURN pair.src AS src_id, pair.tgt AS tgt_id, collect(properties(r)) AS edges
581
  """
582
  result = await session.run(query, pairs=pairs)
@@ -627,8 +646,9 @@ class Neo4JStorage(BaseGraphStorage):
627
  database=self._DATABASE, default_access_mode="READ"
628
  ) as session:
629
  try:
630
- query = """MATCH (n:base {entity_id: $entity_id})
631
- OPTIONAL MATCH (n)-[r]-(connected:base)
 
632
  WHERE connected.entity_id IS NOT NULL
633
  RETURN n, r, connected"""
634
  results = await session.run(query, entity_id=source_node_id)
@@ -689,10 +709,11 @@ class Neo4JStorage(BaseGraphStorage):
689
  database=self._DATABASE, default_access_mode="READ"
690
  ) as session:
691
  # Query to get both outgoing and incoming edges
692
- query = """
 
693
  UNWIND $node_ids AS id
694
- MATCH (n:base {entity_id: id})
695
- OPTIONAL MATCH (n)-[r]-(connected:base)
696
  RETURN id AS queried_id, n.entity_id AS node_entity_id,
697
  connected.entity_id AS connected_entity_id,
698
  startNode(r).entity_id AS start_entity_id
@@ -727,12 +748,13 @@ class Neo4JStorage(BaseGraphStorage):
727
  return edges_dict
728
 
729
  async def get_nodes_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
 
730
  async with self._driver.session(
731
  database=self._DATABASE, default_access_mode="READ"
732
  ) as session:
733
- query = """
734
  UNWIND $chunk_ids AS chunk_id
735
- MATCH (n:base)
736
  WHERE n.source_id IS NOT NULL AND chunk_id IN split(n.source_id, $sep)
737
  RETURN DISTINCT n
738
  """
@@ -748,12 +770,13 @@ class Neo4JStorage(BaseGraphStorage):
748
  return nodes
749
 
750
  async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
 
751
  async with self._driver.session(
752
  database=self._DATABASE, default_access_mode="READ"
753
  ) as session:
754
- query = """
755
  UNWIND $chunk_ids AS chunk_id
756
- MATCH (a:base)-[r]-(b:base)
757
  WHERE r.source_id IS NOT NULL AND chunk_id IN split(r.source_id, $sep)
758
  RETURN DISTINCT a.entity_id AS source, b.entity_id AS target, properties(r) AS properties
759
  """
@@ -787,6 +810,7 @@ class Neo4JStorage(BaseGraphStorage):
787
  node_id: The unique identifier for the node (used as label)
788
  node_data: Dictionary of node properties
789
  """
 
790
  properties = node_data
791
  entity_type = properties["entity_type"]
792
  if "entity_id" not in properties:
@@ -796,14 +820,11 @@ class Neo4JStorage(BaseGraphStorage):
796
  async with self._driver.session(database=self._DATABASE) as session:
797
 
798
  async def execute_upsert(tx: AsyncManagedTransaction):
799
- query = (
800
- """
801
- MERGE (n:base {entity_id: $entity_id})
802
  SET n += $properties
803
- SET n:`%s`
804
  """
805
- % entity_type
806
- )
807
  result = await tx.run(
808
  query, entity_id=node_id, properties=properties
809
  )
@@ -847,10 +868,11 @@ class Neo4JStorage(BaseGraphStorage):
847
  async with self._driver.session(database=self._DATABASE) as session:
848
 
849
  async def execute_upsert(tx: AsyncManagedTransaction):
850
- query = """
851
- MATCH (source:base {entity_id: $source_entity_id})
 
852
  WITH source
853
- MATCH (target:base {entity_id: $target_entity_id})
854
  MERGE (source)-[r:DIRECTED]-(target)
855
  SET r += $properties
856
  RETURN r, source, target
@@ -889,6 +911,7 @@ class Neo4JStorage(BaseGraphStorage):
889
  KnowledgeGraph object containing nodes and edges, with an is_truncated flag
890
  indicating whether the graph was truncated due to max_nodes limit
891
  """
 
892
  result = KnowledgeGraph()
893
  seen_nodes = set()
894
  seen_edges = set()
@@ -899,7 +922,9 @@ class Neo4JStorage(BaseGraphStorage):
899
  try:
900
  if node_label == "*":
901
  # First check total node count to determine if graph is truncated
902
- count_query = "MATCH (n) RETURN count(n) as total"
 
 
903
  count_result = None
904
  try:
905
  count_result = await session.run(count_query)
@@ -915,13 +940,13 @@ class Neo4JStorage(BaseGraphStorage):
915
  await count_result.consume()
916
 
917
  # Run main query to get nodes with highest degree
918
- main_query = """
919
- MATCH (n)
920
  OPTIONAL MATCH (n)-[r]-()
921
  WITH n, COALESCE(count(r), 0) AS degree
922
  ORDER BY degree DESC
923
  LIMIT $max_nodes
924
- WITH collect({node: n}) AS filtered_nodes
925
  UNWIND filtered_nodes AS node_info
926
  WITH collect(node_info.node) AS kept_nodes, filtered_nodes
927
  OPTIONAL MATCH (a)-[r]-(b)
@@ -943,20 +968,21 @@ class Neo4JStorage(BaseGraphStorage):
943
  else:
944
  # return await self._robust_fallback(node_label, max_depth, max_nodes)
945
  # First try without limit to check if we need to truncate
946
- full_query = """
947
- MATCH (start)
948
  WHERE start.entity_id = $entity_id
949
  WITH start
950
- CALL apoc.path.subgraphAll(start, {
951
  relationshipFilter: '',
 
952
  minLevel: 0,
953
  maxLevel: $max_depth,
954
  bfs: true
955
- })
956
  YIELD nodes, relationships
957
  WITH nodes, relationships, size(nodes) AS total_nodes
958
  UNWIND nodes AS node
959
- WITH collect({node: node}) AS node_info, relationships, total_nodes
960
  RETURN node_info, relationships, total_nodes
961
  """
962
 
@@ -994,20 +1020,21 @@ class Neo4JStorage(BaseGraphStorage):
994
  )
995
 
996
  # Run limited query
997
- limited_query = """
998
- MATCH (start)
999
  WHERE start.entity_id = $entity_id
1000
  WITH start
1001
- CALL apoc.path.subgraphAll(start, {
1002
  relationshipFilter: '',
 
1003
  minLevel: 0,
1004
  maxLevel: $max_depth,
1005
  limit: $max_nodes,
1006
  bfs: true
1007
- })
1008
  YIELD nodes, relationships
1009
  UNWIND nodes AS node
1010
- WITH collect({node: node}) AS node_info, relationships
1011
  RETURN node_info, relationships
1012
  """
1013
  result_set = None
@@ -1094,11 +1121,12 @@ class Neo4JStorage(BaseGraphStorage):
1094
  visited_edge_pairs = set()
1095
 
1096
  # Get the starting node's data
 
1097
  async with self._driver.session(
1098
  database=self._DATABASE, default_access_mode="READ"
1099
  ) as session:
1100
- query = """
1101
- MATCH (n:base {entity_id: $entity_id})
1102
  RETURN id(n) as node_id, n
1103
  """
1104
  node_result = await session.run(query, entity_id=node_label)
@@ -1156,8 +1184,9 @@ class Neo4JStorage(BaseGraphStorage):
1156
  async with self._driver.session(
1157
  database=self._DATABASE, default_access_mode="READ"
1158
  ) as session:
1159
- query = """
1160
- MATCH (a:base {entity_id: $entity_id})-[r]-(b)
 
1161
  WITH r, b, id(r) as edge_id, id(b) as target_id
1162
  RETURN r, b, edge_id, target_id
1163
  """
@@ -1241,6 +1270,7 @@ class Neo4JStorage(BaseGraphStorage):
1241
  Returns:
1242
  ["Person", "Company", ...] # Alphabetically sorted label list
1243
  """
 
1244
  async with self._driver.session(
1245
  database=self._DATABASE, default_access_mode="READ"
1246
  ) as session:
@@ -1248,8 +1278,8 @@ class Neo4JStorage(BaseGraphStorage):
1248
  # query = "CALL db.labels() YIELD label RETURN label"
1249
 
1250
  # Method 2: Query compatible with older versions
1251
- query = """
1252
- MATCH (n:base)
1253
  WHERE n.entity_id IS NOT NULL
1254
  RETURN DISTINCT n.entity_id AS label
1255
  ORDER BY label
@@ -1285,8 +1315,9 @@ class Neo4JStorage(BaseGraphStorage):
1285
  """
1286
 
1287
  async def _do_delete(tx: AsyncManagedTransaction):
1288
- query = """
1289
- MATCH (n:base {entity_id: $entity_id})
 
1290
  DETACH DELETE n
1291
  """
1292
  result = await tx.run(query, entity_id=node_id)
@@ -1342,8 +1373,9 @@ class Neo4JStorage(BaseGraphStorage):
1342
  for source, target in edges:
1343
 
1344
  async def _do_delete_edge(tx: AsyncManagedTransaction):
1345
- query = """
1346
- MATCH (source:base {entity_id: $source_entity_id})-[r]-(target:base {entity_id: $target_entity_id})
 
1347
  DELETE r
1348
  """
1349
  result = await tx.run(
@@ -1360,26 +1392,32 @@ class Neo4JStorage(BaseGraphStorage):
1360
  raise
1361
 
1362
  async def drop(self) -> dict[str, str]:
1363
- """Drop all data from storage and clean up resources
1364
 
1365
- This method will delete all nodes and relationships in the Neo4j database.
1366
 
1367
  Returns:
1368
  dict[str, str]: Operation status and message
1369
- - On success: {"status": "success", "message": "data dropped"}
1370
  - On failure: {"status": "error", "message": "<error details>"}
1371
  """
 
1372
  try:
1373
  async with self._driver.session(database=self._DATABASE) as session:
1374
- # Delete all nodes and relationships
1375
- query = "MATCH (n) DETACH DELETE n"
1376
  result = await session.run(query)
1377
  await result.consume() # Ensure result is fully consumed
1378
 
1379
  logger.info(
1380
- f"Process {os.getpid()} drop Neo4j database {self._DATABASE}"
1381
  )
1382
- return {"status": "success", "message": "data dropped"}
 
 
 
1383
  except Exception as e:
1384
- logger.error(f"Error dropping Neo4j database {self._DATABASE}: {e}")
 
 
1385
  return {"status": "error", "message": str(e)}
 
50
  @final
51
  @dataclass
52
  class Neo4JStorage(BaseGraphStorage):
53
+ def __init__(self, namespace, global_config, embedding_func, workspace=None):
54
  super().__init__(
55
  namespace=namespace,
56
+ workspace=workspace or "",
57
  global_config=global_config,
58
  embedding_func=embedding_func,
59
  )
60
  self._driver = None
61
 
62
+ def _get_workspace_label(self) -> str:
63
+ """Get workspace label, return 'base' for compatibility when workspace is empty"""
64
+ workspace = getattr(self, "workspace", None)
65
+ return workspace if workspace else "base"
66
+
67
  async def initialize(self):
68
  URI = os.environ.get("NEO4J_URI", config.get("neo4j", "uri", fallback=None))
69
  USERNAME = os.environ.get(
 
159
  raise e
160
 
161
  if connected:
162
+ # Create index for workspace nodes on entity_id if it doesn't exist
163
+ workspace_label = self._get_workspace_label()
164
  try:
165
  async with self._driver.session(database=database) as session:
166
  # Check if index exists first
167
+ check_query = f"""
168
  CALL db.indexes() YIELD name, labelsOrTypes, properties
169
+ WHERE labelsOrTypes = ['{workspace_label}'] AND properties = ['entity_id']
170
  RETURN count(*) > 0 AS exists
171
  """
172
  try:
 
179
  if not index_exists:
180
  # Create index only if it doesn't exist
181
  result = await session.run(
182
+ f"CREATE INDEX FOR (n:`{workspace_label}`) ON (n.entity_id)"
183
  )
184
  await result.consume()
185
  logger.info(
186
+ f"Created index for {workspace_label} nodes on entity_id in {database}"
187
  )
188
  except Exception:
189
  # Fallback if db.indexes() is not supported in this Neo4j version
190
  result = await session.run(
191
+ f"CREATE INDEX IF NOT EXISTS FOR (n:`{workspace_label}`) ON (n.entity_id)"
192
  )
193
  await result.consume()
194
  except Exception as e:
 
223
  ValueError: If node_id is invalid
224
  Exception: If there is an error executing the query
225
  """
226
+ workspace_label = self._get_workspace_label()
227
  async with self._driver.session(
228
  database=self._DATABASE, default_access_mode="READ"
229
  ) as session:
230
  try:
231
+ query = f"MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) RETURN count(n) > 0 AS node_exists"
232
  result = await session.run(query, entity_id=node_id)
233
  single_result = await result.single()
234
  await result.consume() # Ensure result is fully consumed
 
253
  ValueError: If either node_id is invalid
254
  Exception: If there is an error executing the query
255
  """
256
+ workspace_label = self._get_workspace_label()
257
  async with self._driver.session(
258
  database=self._DATABASE, default_access_mode="READ"
259
  ) as session:
260
  try:
261
  query = (
262
+ f"MATCH (a:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(b:`{workspace_label}` {{entity_id: $target_entity_id}}) "
263
  "RETURN COUNT(r) > 0 AS edgeExists"
264
  )
265
  result = await session.run(
 
291
  ValueError: If node_id is invalid
292
  Exception: If there is an error executing the query
293
  """
294
+ workspace_label = self._get_workspace_label()
295
  async with self._driver.session(
296
  database=self._DATABASE, default_access_mode="READ"
297
  ) as session:
298
  try:
299
+ query = (
300
+ f"MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) RETURN n"
301
+ )
302
  result = await session.run(query, entity_id=node_id)
303
  try:
304
  records = await result.fetch(
 
312
  if records:
313
  node = records[0]["n"]
314
  node_dict = dict(node)
315
+ # Remove workspace label from labels list if it exists
316
  if "labels" in node_dict:
317
  node_dict["labels"] = [
318
  label
319
  for label in node_dict["labels"]
320
+ if label != workspace_label
321
  ]
322
  # logger.debug(f"Neo4j query node {query} return: {node_dict}")
323
  return node_dict
 
338
  Returns:
339
  A dictionary mapping each node_id to its node data (or None if not found).
340
  """
341
+ workspace_label = self._get_workspace_label()
342
  async with self._driver.session(
343
  database=self._DATABASE, default_access_mode="READ"
344
  ) as session:
345
+ query = f"""
346
  UNWIND $node_ids AS id
347
+ MATCH (n:`{workspace_label}` {{entity_id: id}})
348
  RETURN n.entity_id AS entity_id, n
349
  """
350
  result = await session.run(query, node_ids=node_ids)
 
353
  entity_id = record["entity_id"]
354
  node = record["n"]
355
  node_dict = dict(node)
356
+ # Remove the workspace label if present in a 'labels' property
357
  if "labels" in node_dict:
358
  node_dict["labels"] = [
359
+ label
360
+ for label in node_dict["labels"]
361
+ if label != workspace_label
362
  ]
363
  nodes[entity_id] = node_dict
364
  await result.consume() # Make sure to consume the result fully
 
379
  ValueError: If node_id is invalid
380
  Exception: If there is an error executing the query
381
  """
382
+ workspace_label = self._get_workspace_label()
383
  async with self._driver.session(
384
  database=self._DATABASE, default_access_mode="READ"
385
  ) as session:
386
  try:
387
+ query = f"""
388
+ MATCH (n:`{workspace_label}` {{entity_id: $entity_id}})
389
  OPTIONAL MATCH (n)-[r]-()
390
  RETURN COUNT(r) AS degree
391
  """
 
419
  A dictionary mapping each node_id to its degree (number of relationships).
420
  If a node is not found, its degree will be set to 0.
421
  """
422
+ workspace_label = self._get_workspace_label()
423
  async with self._driver.session(
424
  database=self._DATABASE, default_access_mode="READ"
425
  ) as session:
426
+ query = f"""
427
  UNWIND $node_ids AS id
428
+ MATCH (n:`{workspace_label}` {{entity_id: id}})
429
+ RETURN n.entity_id AS entity_id, count {{ (n)--() }} AS degree;
430
  """
431
  result = await session.run(query, node_ids=node_ids)
432
  degrees = {}
 
506
  ValueError: If either node_id is invalid
507
  Exception: If there is an error executing the query
508
  """
509
+ workspace_label = self._get_workspace_label()
510
  try:
511
  async with self._driver.session(
512
  database=self._DATABASE, default_access_mode="READ"
513
  ) as session:
514
+ query = f"""
515
+ MATCH (start:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(end:`{workspace_label}` {{entity_id: $target_entity_id}})
516
  RETURN properties(r) as edge_properties
517
  """
518
  result = await session.run(
 
589
  Returns:
590
  A dictionary mapping (src, tgt) tuples to their edge properties.
591
  """
592
+ workspace_label = self._get_workspace_label()
593
  async with self._driver.session(
594
  database=self._DATABASE, default_access_mode="READ"
595
  ) as session:
596
+ query = f"""
597
  UNWIND $pairs AS pair
598
+ MATCH (start:`{workspace_label}` {{entity_id: pair.src}})-[r:DIRECTED]-(end:`{workspace_label}` {{entity_id: pair.tgt}})
599
  RETURN pair.src AS src_id, pair.tgt AS tgt_id, collect(properties(r)) AS edges
600
  """
601
  result = await session.run(query, pairs=pairs)
 
646
  database=self._DATABASE, default_access_mode="READ"
647
  ) as session:
648
  try:
649
+ workspace_label = self._get_workspace_label()
650
+ query = f"""MATCH (n:`{workspace_label}` {{entity_id: $entity_id}})
651
+ OPTIONAL MATCH (n)-[r]-(connected:`{workspace_label}`)
652
  WHERE connected.entity_id IS NOT NULL
653
  RETURN n, r, connected"""
654
  results = await session.run(query, entity_id=source_node_id)
 
709
  database=self._DATABASE, default_access_mode="READ"
710
  ) as session:
711
  # Query to get both outgoing and incoming edges
712
+ workspace_label = self._get_workspace_label()
713
+ query = f"""
714
  UNWIND $node_ids AS id
715
+ MATCH (n:`{workspace_label}` {{entity_id: id}})
716
+ OPTIONAL MATCH (n)-[r]-(connected:`{workspace_label}`)
717
  RETURN id AS queried_id, n.entity_id AS node_entity_id,
718
  connected.entity_id AS connected_entity_id,
719
  startNode(r).entity_id AS start_entity_id
 
748
  return edges_dict
749
 
750
  async def get_nodes_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
751
+ workspace_label = self._get_workspace_label()
752
  async with self._driver.session(
753
  database=self._DATABASE, default_access_mode="READ"
754
  ) as session:
755
+ query = f"""
756
  UNWIND $chunk_ids AS chunk_id
757
+ MATCH (n:`{workspace_label}`)
758
  WHERE n.source_id IS NOT NULL AND chunk_id IN split(n.source_id, $sep)
759
  RETURN DISTINCT n
760
  """
 
770
  return nodes
771
 
772
  async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
773
+ workspace_label = self._get_workspace_label()
774
  async with self._driver.session(
775
  database=self._DATABASE, default_access_mode="READ"
776
  ) as session:
777
+ query = f"""
778
  UNWIND $chunk_ids AS chunk_id
779
+ MATCH (a:`{workspace_label}`)-[r]-(b:`{workspace_label}`)
780
  WHERE r.source_id IS NOT NULL AND chunk_id IN split(r.source_id, $sep)
781
  RETURN DISTINCT a.entity_id AS source, b.entity_id AS target, properties(r) AS properties
782
  """
 
810
  node_id: The unique identifier for the node (used as label)
811
  node_data: Dictionary of node properties
812
  """
813
+ workspace_label = self._get_workspace_label()
814
  properties = node_data
815
  entity_type = properties["entity_type"]
816
  if "entity_id" not in properties:
 
820
  async with self._driver.session(database=self._DATABASE) as session:
821
 
822
  async def execute_upsert(tx: AsyncManagedTransaction):
823
+ query = f"""
824
+ MERGE (n:`{workspace_label}` {{entity_id: $entity_id}})
 
825
  SET n += $properties
826
+ SET n:`{entity_type}`
827
  """
 
 
828
  result = await tx.run(
829
  query, entity_id=node_id, properties=properties
830
  )
 
868
  async with self._driver.session(database=self._DATABASE) as session:
869
 
870
  async def execute_upsert(tx: AsyncManagedTransaction):
871
+ workspace_label = self._get_workspace_label()
872
+ query = f"""
873
+ MATCH (source:`{workspace_label}` {{entity_id: $source_entity_id}})
874
  WITH source
875
+ MATCH (target:`{workspace_label}` {{entity_id: $target_entity_id}})
876
  MERGE (source)-[r:DIRECTED]-(target)
877
  SET r += $properties
878
  RETURN r, source, target
 
911
  KnowledgeGraph object containing nodes and edges, with an is_truncated flag
912
  indicating whether the graph was truncated due to max_nodes limit
913
  """
914
+ workspace_label = self._get_workspace_label()
915
  result = KnowledgeGraph()
916
  seen_nodes = set()
917
  seen_edges = set()
 
922
  try:
923
  if node_label == "*":
924
  # First check total node count to determine if graph is truncated
925
+ count_query = (
926
+ f"MATCH (n:`{workspace_label}`) RETURN count(n) as total"
927
+ )
928
  count_result = None
929
  try:
930
  count_result = await session.run(count_query)
 
940
  await count_result.consume()
941
 
942
  # Run main query to get nodes with highest degree
943
+ main_query = f"""
944
+ MATCH (n:`{workspace_label}`)
945
  OPTIONAL MATCH (n)-[r]-()
946
  WITH n, COALESCE(count(r), 0) AS degree
947
  ORDER BY degree DESC
948
  LIMIT $max_nodes
949
+ WITH collect({{node: n}}) AS filtered_nodes
950
  UNWIND filtered_nodes AS node_info
951
  WITH collect(node_info.node) AS kept_nodes, filtered_nodes
952
  OPTIONAL MATCH (a)-[r]-(b)
 
968
  else:
969
  # return await self._robust_fallback(node_label, max_depth, max_nodes)
970
  # First try without limit to check if we need to truncate
971
+ full_query = f"""
972
+ MATCH (start:`{workspace_label}`)
973
  WHERE start.entity_id = $entity_id
974
  WITH start
975
+ CALL apoc.path.subgraphAll(start, {{
976
  relationshipFilter: '',
977
+ labelFilter: '{workspace_label}',
978
  minLevel: 0,
979
  maxLevel: $max_depth,
980
  bfs: true
981
+ }})
982
  YIELD nodes, relationships
983
  WITH nodes, relationships, size(nodes) AS total_nodes
984
  UNWIND nodes AS node
985
+ WITH collect({{node: node}}) AS node_info, relationships, total_nodes
986
  RETURN node_info, relationships, total_nodes
987
  """
988
 
 
1020
  )
1021
 
1022
  # Run limited query
1023
+ limited_query = f"""
1024
+ MATCH (start:`{workspace_label}`)
1025
  WHERE start.entity_id = $entity_id
1026
  WITH start
1027
+ CALL apoc.path.subgraphAll(start, {{
1028
  relationshipFilter: '',
1029
+ labelFilter: '{workspace_label}',
1030
  minLevel: 0,
1031
  maxLevel: $max_depth,
1032
  limit: $max_nodes,
1033
  bfs: true
1034
+ }})
1035
  YIELD nodes, relationships
1036
  UNWIND nodes AS node
1037
+ WITH collect({{node: node}}) AS node_info, relationships
1038
  RETURN node_info, relationships
1039
  """
1040
  result_set = None
 
1121
  visited_edge_pairs = set()
1122
 
1123
  # Get the starting node's data
1124
+ workspace_label = self._get_workspace_label()
1125
  async with self._driver.session(
1126
  database=self._DATABASE, default_access_mode="READ"
1127
  ) as session:
1128
+ query = f"""
1129
+ MATCH (n:`{workspace_label}` {{entity_id: $entity_id}})
1130
  RETURN id(n) as node_id, n
1131
  """
1132
  node_result = await session.run(query, entity_id=node_label)
 
1184
  async with self._driver.session(
1185
  database=self._DATABASE, default_access_mode="READ"
1186
  ) as session:
1187
+ workspace_label = self._get_workspace_label()
1188
+ query = f"""
1189
+ MATCH (a:`{workspace_label}` {{entity_id: $entity_id}})-[r]-(b)
1190
  WITH r, b, id(r) as edge_id, id(b) as target_id
1191
  RETURN r, b, edge_id, target_id
1192
  """
 
1270
  Returns:
1271
  ["Person", "Company", ...] # Alphabetically sorted label list
1272
  """
1273
+ workspace_label = self._get_workspace_label()
1274
  async with self._driver.session(
1275
  database=self._DATABASE, default_access_mode="READ"
1276
  ) as session:
 
1278
  # query = "CALL db.labels() YIELD label RETURN label"
1279
 
1280
  # Method 2: Query compatible with older versions
1281
+ query = f"""
1282
+ MATCH (n:`{workspace_label}`)
1283
  WHERE n.entity_id IS NOT NULL
1284
  RETURN DISTINCT n.entity_id AS label
1285
  ORDER BY label
 
1315
  """
1316
 
1317
  async def _do_delete(tx: AsyncManagedTransaction):
1318
+ workspace_label = self._get_workspace_label()
1319
+ query = f"""
1320
+ MATCH (n:`{workspace_label}` {{entity_id: $entity_id}})
1321
  DETACH DELETE n
1322
  """
1323
  result = await tx.run(query, entity_id=node_id)
 
1373
  for source, target in edges:
1374
 
1375
  async def _do_delete_edge(tx: AsyncManagedTransaction):
1376
+ workspace_label = self._get_workspace_label()
1377
+ query = f"""
1378
+ MATCH (source:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(target:`{workspace_label}` {{entity_id: $target_entity_id}})
1379
  DELETE r
1380
  """
1381
  result = await tx.run(
 
1392
  raise
1393
 
1394
  async def drop(self) -> dict[str, str]:
1395
+ """Drop all data from current workspace storage and clean up resources
1396
 
1397
+ This method will delete all nodes and relationships in the current workspace only.
1398
 
1399
  Returns:
1400
  dict[str, str]: Operation status and message
1401
+ - On success: {"status": "success", "message": "workspace data dropped"}
1402
  - On failure: {"status": "error", "message": "<error details>"}
1403
  """
1404
+ workspace_label = self._get_workspace_label()
1405
  try:
1406
  async with self._driver.session(database=self._DATABASE) as session:
1407
+ # Delete all nodes and relationships in current workspace only
1408
+ query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n"
1409
  result = await session.run(query)
1410
  await result.consume() # Ensure result is fully consumed
1411
 
1412
  logger.info(
1413
+ f"Process {os.getpid()} drop Neo4j workspace '{workspace_label}' in database {self._DATABASE}"
1414
  )
1415
+ return {
1416
+ "status": "success",
1417
+ "message": f"workspace '{workspace_label}' data dropped",
1418
+ }
1419
  except Exception as e:
1420
+ logger.error(
1421
+ f"Error dropping Neo4j workspace '{workspace_label}' in database {self._DATABASE}: {e}"
1422
+ )
1423
  return {"status": "error", "message": str(e)}
lightrag/kg/networkx_impl.py CHANGED
@@ -46,9 +46,19 @@ class NetworkXStorage(BaseGraphStorage):
46
  nx.write_graphml(graph, file_name)
47
 
48
  def __post_init__(self):
49
- self._graphml_xml_file = os.path.join(
50
- self.global_config["working_dir"], f"graph_{self.namespace}.graphml"
51
- )
 
 
 
 
 
 
 
 
 
 
52
  self._storage_lock = None
53
  self.storage_updated = None
54
  self._graph = None
 
46
  nx.write_graphml(graph, file_name)
47
 
48
  def __post_init__(self):
49
+ working_dir = self.global_config["working_dir"]
50
+ if self.workspace:
51
+ # Include workspace in the file path for data isolation
52
+ workspace_dir = os.path.join(working_dir, self.workspace)
53
+ os.makedirs(workspace_dir, exist_ok=True)
54
+ self._graphml_xml_file = os.path.join(
55
+ workspace_dir, f"graph_{self.namespace}.graphml"
56
+ )
57
+ else:
58
+ # Default behavior when workspace is empty
59
+ self._graphml_xml_file = os.path.join(
60
+ working_dir, f"graph_{self.namespace}.graphml"
61
+ )
62
  self._storage_lock = None
63
  self.storage_updated = None
64
  self._graph = None
lightrag/kg/postgres_impl.py CHANGED
@@ -1,6 +1,7 @@
1
  import asyncio
2
  import json
3
  import os
 
4
  import datetime
5
  from datetime import timezone
6
  from dataclasses import dataclass, field
@@ -319,7 +320,7 @@ class PostgreSQLDB:
319
  # Get all old format data
320
  old_data_sql = """
321
  SELECT id, mode, original_prompt, return_value, chunk_id,
322
- create_time, update_time
323
  FROM LIGHTRAG_LLM_CACHE
324
  WHERE id NOT LIKE '%:%'
325
  """
@@ -364,7 +365,9 @@ class PostgreSQLDB:
364
  await self.execute(
365
  insert_sql,
366
  {
367
- "workspace": self.workspace,
 
 
368
  "id": new_key,
369
  "mode": record["mode"],
370
  "original_prompt": record["original_prompt"],
@@ -384,7 +387,9 @@ class PostgreSQLDB:
384
  await self.execute(
385
  delete_sql,
386
  {
387
- "workspace": self.workspace,
 
 
388
  "mode": record["mode"],
389
  "id": record["id"], # Old id
390
  },
@@ -670,7 +675,7 @@ class ClientManager:
670
  ),
671
  "workspace": os.environ.get(
672
  "POSTGRES_WORKSPACE",
673
- config.get("postgres", "workspace", fallback="default"),
674
  ),
675
  "max_connections": os.environ.get(
676
  "POSTGRES_MAX_CONNECTIONS",
@@ -716,6 +721,18 @@ class PGKVStorage(BaseKVStorage):
716
  async def initialize(self):
717
  if self.db is None:
718
  self.db = await ClientManager.get_client()
 
 
 
 
 
 
 
 
 
 
 
 
719
 
720
  async def finalize(self):
721
  if self.db is not None:
@@ -1047,6 +1064,18 @@ class PGVectorStorage(BaseVectorStorage):
1047
  async def initialize(self):
1048
  if self.db is None:
1049
  self.db = await ClientManager.get_client()
 
 
 
 
 
 
 
 
 
 
 
 
1050
 
1051
  async def finalize(self):
1052
  if self.db is not None:
@@ -1328,6 +1357,18 @@ class PGDocStatusStorage(DocStatusStorage):
1328
  async def initialize(self):
1329
  if self.db is None:
1330
  self.db = await ClientManager.get_client()
 
 
 
 
 
 
 
 
 
 
 
 
1331
 
1332
  async def finalize(self):
1333
  if self.db is not None:
@@ -1606,9 +1647,34 @@ class PGGraphQueryException(Exception):
1606
  @dataclass
1607
  class PGGraphStorage(BaseGraphStorage):
1608
  def __post_init__(self):
1609
- self.graph_name = self.namespace or os.environ.get("AGE_GRAPH_NAME", "lightrag")
1610
  self.db: PostgreSQLDB | None = None
1611
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1612
  @staticmethod
1613
  def _normalize_node_id(node_id: str) -> str:
1614
  """
@@ -1629,6 +1695,27 @@ class PGGraphStorage(BaseGraphStorage):
1629
  async def initialize(self):
1630
  if self.db is None:
1631
  self.db = await ClientManager.get_client()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1632
 
1633
  # Execute each statement separately and ignore errors
1634
  queries = [
@@ -2833,7 +2920,10 @@ class PGGraphStorage(BaseGraphStorage):
2833
  $$) AS (result agtype)"""
2834
 
2835
  await self._query(drop_query, readonly=False)
2836
- return {"status": "success", "message": "graph data dropped"}
 
 
 
2837
  except Exception as e:
2838
  logger.error(f"Error dropping graph: {e}")
2839
  return {"status": "error", "message": str(e)}
 
1
  import asyncio
2
  import json
3
  import os
4
+ import re
5
  import datetime
6
  from datetime import timezone
7
  from dataclasses import dataclass, field
 
320
  # Get all old format data
321
  old_data_sql = """
322
  SELECT id, mode, original_prompt, return_value, chunk_id,
323
+ workspace, create_time, update_time
324
  FROM LIGHTRAG_LLM_CACHE
325
  WHERE id NOT LIKE '%:%'
326
  """
 
365
  await self.execute(
366
  insert_sql,
367
  {
368
+ "workspace": record[
369
+ "workspace"
370
+ ], # Use original record's workspace
371
  "id": new_key,
372
  "mode": record["mode"],
373
  "original_prompt": record["original_prompt"],
 
387
  await self.execute(
388
  delete_sql,
389
  {
390
+ "workspace": record[
391
+ "workspace"
392
+ ], # Use original record's workspace
393
  "mode": record["mode"],
394
  "id": record["id"], # Old id
395
  },
 
675
  ),
676
  "workspace": os.environ.get(
677
  "POSTGRES_WORKSPACE",
678
+ config.get("postgres", "workspace", fallback=None),
679
  ),
680
  "max_connections": os.environ.get(
681
  "POSTGRES_MAX_CONNECTIONS",
 
721
  async def initialize(self):
722
  if self.db is None:
723
  self.db = await ClientManager.get_client()
724
+ # Implement workspace priority: PostgreSQLDB.workspace > self.workspace > "default"
725
+ if self.db.workspace:
726
+ # Use PostgreSQLDB's workspace (highest priority)
727
+ final_workspace = self.db.workspace
728
+ elif hasattr(self, "workspace") and self.workspace:
729
+ # Use storage class's workspace (medium priority)
730
+ final_workspace = self.workspace
731
+ self.db.workspace = final_workspace
732
+ else:
733
+ # Use "default" for compatibility (lowest priority)
734
+ final_workspace = "default"
735
+ self.db.workspace = final_workspace
736
 
737
  async def finalize(self):
738
  if self.db is not None:
 
1064
  async def initialize(self):
1065
  if self.db is None:
1066
  self.db = await ClientManager.get_client()
1067
+ # Implement workspace priority: PostgreSQLDB.workspace > self.workspace > "default"
1068
+ if self.db.workspace:
1069
+ # Use PostgreSQLDB's workspace (highest priority)
1070
+ final_workspace = self.db.workspace
1071
+ elif hasattr(self, "workspace") and self.workspace:
1072
+ # Use storage class's workspace (medium priority)
1073
+ final_workspace = self.workspace
1074
+ self.db.workspace = final_workspace
1075
+ else:
1076
+ # Use "default" for compatibility (lowest priority)
1077
+ final_workspace = "default"
1078
+ self.db.workspace = final_workspace
1079
 
1080
  async def finalize(self):
1081
  if self.db is not None:
 
1357
  async def initialize(self):
1358
  if self.db is None:
1359
  self.db = await ClientManager.get_client()
1360
+ # Implement workspace priority: PostgreSQLDB.workspace > self.workspace > "default"
1361
+ if self.db.workspace:
1362
+ # Use PostgreSQLDB's workspace (highest priority)
1363
+ final_workspace = self.db.workspace
1364
+ elif hasattr(self, "workspace") and self.workspace:
1365
+ # Use storage class's workspace (medium priority)
1366
+ final_workspace = self.workspace
1367
+ self.db.workspace = final_workspace
1368
+ else:
1369
+ # Use "default" for compatibility (lowest priority)
1370
+ final_workspace = "default"
1371
+ self.db.workspace = final_workspace
1372
 
1373
  async def finalize(self):
1374
  if self.db is not None:
 
1647
  @dataclass
1648
  class PGGraphStorage(BaseGraphStorage):
1649
  def __post_init__(self):
1650
+ # Graph name will be dynamically generated in initialize() based on workspace
1651
  self.db: PostgreSQLDB | None = None
1652
 
1653
+ def _get_workspace_graph_name(self) -> str:
1654
+ """
1655
+ Generate graph name based on workspace and namespace for data isolation.
1656
+ Rules:
1657
+ - If workspace is empty: graph_name = namespace
1658
+ - If workspace has value: graph_name = workspace_namespace
1659
+
1660
+ Args:
1661
+ None
1662
+
1663
+ Returns:
1664
+ str: The graph name for the current workspace
1665
+ """
1666
+ workspace = getattr(self, "workspace", None)
1667
+ namespace = self.namespace or os.environ.get("AGE_GRAPH_NAME", "lightrag")
1668
+
1669
+ if workspace and workspace.strip():
1670
+ # Ensure names comply with PostgreSQL identifier specifications
1671
+ safe_workspace = re.sub(r"[^a-zA-Z0-9_]", "_", workspace.strip())
1672
+ safe_namespace = re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
1673
+ return f"{safe_workspace}_{safe_namespace}"
1674
+ else:
1675
+ # When workspace is empty, use namespace directly
1676
+ return re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
1677
+
1678
  @staticmethod
1679
  def _normalize_node_id(node_id: str) -> str:
1680
  """
 
1695
  async def initialize(self):
1696
  if self.db is None:
1697
  self.db = await ClientManager.get_client()
1698
+ # Implement workspace priority: PostgreSQLDB.workspace > self.workspace > None
1699
+ if self.db.workspace:
1700
+ # Use PostgreSQLDB's workspace (highest priority)
1701
+ final_workspace = self.db.workspace
1702
+ elif hasattr(self, "workspace") and self.workspace:
1703
+ # Use storage class's workspace (medium priority)
1704
+ final_workspace = self.workspace
1705
+ self.db.workspace = final_workspace
1706
+ else:
1707
+ # Use None for compatibility (lowest priority)
1708
+ final_workspace = None
1709
+ self.db.workspace = final_workspace
1710
+
1711
+ # Dynamically generate graph name based on workspace
1712
+ self.workspace = self.db.workspace
1713
+ self.graph_name = self._get_workspace_graph_name()
1714
+
1715
+ # Log the graph initialization for debugging
1716
+ logger.info(
1717
+ f"PostgreSQL Graph initialized: workspace='{self.workspace}', graph_name='{self.graph_name}'"
1718
+ )
1719
 
1720
  # Execute each statement separately and ignore errors
1721
  queries = [
 
2920
  $$) AS (result agtype)"""
2921
 
2922
  await self._query(drop_query, readonly=False)
2923
+ return {
2924
+ "status": "success",
2925
+ "message": f"workspace '{self.workspace}' graph data dropped",
2926
+ }
2927
  except Exception as e:
2928
  logger.error(f"Error dropping graph: {e}")
2929
  return {"status": "error", "message": str(e)}
lightrag/kg/qdrant_impl.py CHANGED
@@ -50,6 +50,18 @@ def compute_mdhash_id_for_qdrant(
50
  @final
51
  @dataclass
52
  class QdrantVectorDBStorage(BaseVectorStorage):
 
 
 
 
 
 
 
 
 
 
 
 
53
  @staticmethod
54
  def create_collection_if_not_exist(
55
  client: QdrantClient, collection_name: str, **kwargs
@@ -59,6 +71,29 @@ class QdrantVectorDBStorage(BaseVectorStorage):
59
  client.create_collection(collection_name, **kwargs)
60
 
61
  def __post_init__(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
  kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
63
  cosine_threshold = kwargs.get("cosine_better_than_threshold")
64
  if cosine_threshold is None:
 
50
  @final
51
  @dataclass
52
  class QdrantVectorDBStorage(BaseVectorStorage):
53
+ def __init__(
54
+ self, namespace, global_config, embedding_func, workspace=None, meta_fields=None
55
+ ):
56
+ super().__init__(
57
+ namespace=namespace,
58
+ workspace=workspace or "",
59
+ global_config=global_config,
60
+ embedding_func=embedding_func,
61
+ meta_fields=meta_fields or set(),
62
+ )
63
+ self.__post_init__()
64
+
65
  @staticmethod
66
  def create_collection_if_not_exist(
67
  client: QdrantClient, collection_name: str, **kwargs
 
71
  client.create_collection(collection_name, **kwargs)
72
 
73
  def __post_init__(self):
74
+ # Check for QDRANT_WORKSPACE environment variable first (higher priority)
75
+ # This allows administrators to force a specific workspace for all Qdrant storage instances
76
+ qdrant_workspace = os.environ.get("QDRANT_WORKSPACE")
77
+ if qdrant_workspace and qdrant_workspace.strip():
78
+ # Use environment variable value, overriding the passed workspace parameter
79
+ effective_workspace = qdrant_workspace.strip()
80
+ logger.info(
81
+ f"Using QDRANT_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
82
+ )
83
+ else:
84
+ # Use the workspace parameter passed during initialization
85
+ effective_workspace = self.workspace
86
+ if effective_workspace:
87
+ logger.debug(
88
+ f"Using passed workspace parameter: '{effective_workspace}'"
89
+ )
90
+
91
+ # Build namespace with workspace prefix for data isolation
92
+ if effective_workspace:
93
+ self.namespace = f"{effective_workspace}_{self.namespace}"
94
+ logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
95
+ # When workspace is empty, keep the original namespace unchanged
96
+
97
  kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
98
  cosine_threshold = kwargs.get("cosine_better_than_threshold")
99
  if cosine_threshold is None:
lightrag/kg/redis_impl.py CHANGED
@@ -71,6 +71,29 @@ class RedisConnectionManager:
71
  @dataclass
72
  class RedisKVStorage(BaseKVStorage):
73
  def __post_init__(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
  redis_url = os.environ.get(
75
  "REDIS_URI", config.get("redis", "uri", fallback="redis://localhost:6379")
76
  )
@@ -461,6 +484,29 @@ class RedisDocStatusStorage(DocStatusStorage):
461
  """Redis implementation of document status storage"""
462
 
463
  def __post_init__(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
464
  redis_url = os.environ.get(
465
  "REDIS_URI", config.get("redis", "uri", fallback="redis://localhost:6379")
466
  )
 
71
  @dataclass
72
  class RedisKVStorage(BaseKVStorage):
73
  def __post_init__(self):
74
+ # Check for REDIS_WORKSPACE environment variable first (higher priority)
75
+ # This allows administrators to force a specific workspace for all Redis storage instances
76
+ redis_workspace = os.environ.get("REDIS_WORKSPACE")
77
+ if redis_workspace and redis_workspace.strip():
78
+ # Use environment variable value, overriding the passed workspace parameter
79
+ effective_workspace = redis_workspace.strip()
80
+ logger.info(
81
+ f"Using REDIS_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
82
+ )
83
+ else:
84
+ # Use the workspace parameter passed during initialization
85
+ effective_workspace = self.workspace
86
+ if effective_workspace:
87
+ logger.debug(
88
+ f"Using passed workspace parameter: '{effective_workspace}'"
89
+ )
90
+
91
+ # Build namespace with workspace prefix for data isolation
92
+ if effective_workspace:
93
+ self.namespace = f"{effective_workspace}_{self.namespace}"
94
+ logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
95
+ # When workspace is empty, keep the original namespace unchanged
96
+
97
  redis_url = os.environ.get(
98
  "REDIS_URI", config.get("redis", "uri", fallback="redis://localhost:6379")
99
  )
 
484
  """Redis implementation of document status storage"""
485
 
486
  def __post_init__(self):
487
+ # Check for REDIS_WORKSPACE environment variable first (higher priority)
488
+ # This allows administrators to force a specific workspace for all Redis storage instances
489
+ redis_workspace = os.environ.get("REDIS_WORKSPACE")
490
+ if redis_workspace and redis_workspace.strip():
491
+ # Use environment variable value, overriding the passed workspace parameter
492
+ effective_workspace = redis_workspace.strip()
493
+ logger.info(
494
+ f"Using REDIS_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
495
+ )
496
+ else:
497
+ # Use the workspace parameter passed during initialization
498
+ effective_workspace = self.workspace
499
+ if effective_workspace:
500
+ logger.debug(
501
+ f"Using passed workspace parameter: '{effective_workspace}'"
502
+ )
503
+
504
+ # Build namespace with workspace prefix for data isolation
505
+ if effective_workspace:
506
+ self.namespace = f"{effective_workspace}_{self.namespace}"
507
+ logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
508
+ # When workspace is empty, keep the original namespace unchanged
509
+
510
  redis_url = os.environ.get(
511
  "REDIS_URI", config.get("redis", "uri", fallback="redis://localhost:6379")
512
  )
lightrag/lightrag.py CHANGED
@@ -97,9 +97,7 @@ class LightRAG:
97
  # Directory
98
  # ---
99
 
100
- working_dir: str = field(
101
- default=f"./lightrag_cache_{datetime.now().strftime('%Y-%m-%d-%H:%M:%S')}"
102
- )
103
  """Directory where cache and temporary files are stored."""
104
 
105
  # Storage
@@ -117,6 +115,12 @@ class LightRAG:
117
  doc_status_storage: str = field(default="JsonDocStatusStorage")
118
  """Storage type for tracking document processing statuses."""
119
 
 
 
 
 
 
 
120
  # Logging (Deprecated, use setup_logger in utils.py instead)
121
  # ---
122
  log_level: int | None = field(default=None)
@@ -242,7 +246,6 @@ class LightRAG:
242
  vector_db_storage_cls_kwargs: dict[str, Any] = field(default_factory=dict)
243
  """Additional parameters for vector database storage."""
244
 
245
-
246
  enable_llm_cache: bool = field(default=True)
247
  """Enables caching for LLM responses to avoid redundant computations."""
248
 
@@ -380,39 +383,44 @@ class LightRAG:
380
 
381
  self.llm_response_cache: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
382
  namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE,
383
- global_config=asdict(
384
- self
385
- ), # Add global_config to ensure cache works properly
386
  embedding_func=self.embedding_func,
387
  )
388
 
389
  self.full_docs: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
390
  namespace=NameSpace.KV_STORE_FULL_DOCS,
 
391
  embedding_func=self.embedding_func,
392
  )
393
 
394
  self.text_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
395
  namespace=NameSpace.KV_STORE_TEXT_CHUNKS,
 
396
  embedding_func=self.embedding_func,
397
  )
398
 
399
  self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore
400
  namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION,
 
401
  embedding_func=self.embedding_func,
402
  )
403
 
404
  self.entities_vdb: BaseVectorStorage = self.vector_db_storage_cls( # type: ignore
405
  namespace=NameSpace.VECTOR_STORE_ENTITIES,
 
406
  embedding_func=self.embedding_func,
407
  meta_fields={"entity_name", "source_id", "content", "file_path"},
408
  )
409
  self.relationships_vdb: BaseVectorStorage = self.vector_db_storage_cls( # type: ignore
410
  namespace=NameSpace.VECTOR_STORE_RELATIONSHIPS,
 
411
  embedding_func=self.embedding_func,
412
  meta_fields={"src_id", "tgt_id", "source_id", "content", "file_path"},
413
  )
414
  self.chunks_vdb: BaseVectorStorage = self.vector_db_storage_cls( # type: ignore
415
  namespace=NameSpace.VECTOR_STORE_CHUNKS,
 
416
  embedding_func=self.embedding_func,
417
  meta_fields={"full_doc_id", "content", "file_path"},
418
  )
@@ -420,6 +428,7 @@ class LightRAG:
420
  # Initialize document status storage
421
  self.doc_status: DocStatusStorage = self.doc_status_storage_cls(
422
  namespace=NameSpace.DOC_STATUS,
 
423
  global_config=global_config,
424
  embedding_func=None,
425
  )
 
97
  # Directory
98
  # ---
99
 
100
+ working_dir: str = field(default="./rag_storage")
 
 
101
  """Directory where cache and temporary files are stored."""
102
 
103
  # Storage
 
115
  doc_status_storage: str = field(default="JsonDocStatusStorage")
116
  """Storage type for tracking document processing statuses."""
117
 
118
+ # Workspace
119
+ # ---
120
+
121
+ workspace: str = field(default_factory=lambda: os.getenv("WORKSPACE", ""))
122
+ """Workspace for data isolation. Defaults to empty string if WORKSPACE environment variable is not set."""
123
+
124
  # Logging (Deprecated, use setup_logger in utils.py instead)
125
  # ---
126
  log_level: int | None = field(default=None)
 
246
  vector_db_storage_cls_kwargs: dict[str, Any] = field(default_factory=dict)
247
  """Additional parameters for vector database storage."""
248
 
 
249
  enable_llm_cache: bool = field(default=True)
250
  """Enables caching for LLM responses to avoid redundant computations."""
251
 
 
383
 
384
  self.llm_response_cache: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
385
  namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE,
386
+ workspace=self.workspace,
387
+ global_config=global_config,
 
388
  embedding_func=self.embedding_func,
389
  )
390
 
391
  self.full_docs: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
392
  namespace=NameSpace.KV_STORE_FULL_DOCS,
393
+ workspace=self.workspace,
394
  embedding_func=self.embedding_func,
395
  )
396
 
397
  self.text_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
398
  namespace=NameSpace.KV_STORE_TEXT_CHUNKS,
399
+ workspace=self.workspace,
400
  embedding_func=self.embedding_func,
401
  )
402
 
403
  self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore
404
  namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION,
405
+ workspace=self.workspace,
406
  embedding_func=self.embedding_func,
407
  )
408
 
409
  self.entities_vdb: BaseVectorStorage = self.vector_db_storage_cls( # type: ignore
410
  namespace=NameSpace.VECTOR_STORE_ENTITIES,
411
+ workspace=self.workspace,
412
  embedding_func=self.embedding_func,
413
  meta_fields={"entity_name", "source_id", "content", "file_path"},
414
  )
415
  self.relationships_vdb: BaseVectorStorage = self.vector_db_storage_cls( # type: ignore
416
  namespace=NameSpace.VECTOR_STORE_RELATIONSHIPS,
417
+ workspace=self.workspace,
418
  embedding_func=self.embedding_func,
419
  meta_fields={"src_id", "tgt_id", "source_id", "content", "file_path"},
420
  )
421
  self.chunks_vdb: BaseVectorStorage = self.vector_db_storage_cls( # type: ignore
422
  namespace=NameSpace.VECTOR_STORE_CHUNKS,
423
+ workspace=self.workspace,
424
  embedding_func=self.embedding_func,
425
  meta_fields={"full_doc_id", "content", "file_path"},
426
  )
 
428
  # Initialize document status storage
429
  self.doc_status: DocStatusStorage = self.doc_status_storage_cls(
430
  namespace=NameSpace.DOC_STATUS,
431
+ workspace=self.workspace,
432
  global_config=global_config,
433
  embedding_func=None,
434
  )