Larfii commited on
Commit
d3c3d88
·
1 Parent(s): 31f9f35

Add a progress bar

Browse files
Files changed (3) hide show
  1. lightrag/lightrag.py +4 -1
  2. lightrag/operate.py +42 -17
  3. lightrag/storage.py +11 -3
lightrag/lightrag.py CHANGED
@@ -1,5 +1,6 @@
1
  import asyncio
2
  import os
 
3
  from dataclasses import asdict, dataclass, field
4
  from datetime import datetime
5
  from functools import partial
@@ -243,7 +244,9 @@ class LightRAG:
243
  logger.info(f"[New Docs] inserting {len(new_docs)} docs")
244
 
245
  inserting_chunks = {}
246
- for doc_key, doc in new_docs.items():
 
 
247
  chunks = {
248
  compute_mdhash_id(dp["content"], prefix="chunk-"): {
249
  **dp,
 
1
  import asyncio
2
  import os
3
+ from tqdm.asyncio import tqdm as tqdm_async
4
  from dataclasses import asdict, dataclass, field
5
  from datetime import datetime
6
  from functools import partial
 
244
  logger.info(f"[New Docs] inserting {len(new_docs)} docs")
245
 
246
  inserting_chunks = {}
247
+ for doc_key, doc in tqdm_async(
248
+ new_docs.items(), desc="Chunking documents", unit="doc"
249
+ ):
250
  chunks = {
251
  compute_mdhash_id(dp["content"], prefix="chunk-"): {
252
  **dp,
lightrag/operate.py CHANGED
@@ -1,6 +1,7 @@
1
  import asyncio
2
  import json
3
  import re
 
4
  from typing import Union
5
  from collections import Counter, defaultdict
6
  import warnings
@@ -329,11 +330,15 @@ async def extract_entities(
329
  )
330
  return dict(maybe_nodes), dict(maybe_edges)
331
 
332
- # use_llm_func is wrapped in ascynio.Semaphore, limiting max_async callings
333
- results = await asyncio.gather(
334
- *[_process_single_content(c) for c in ordered_chunks]
335
- )
336
- print() # clear the progress bar
 
 
 
 
337
  maybe_nodes = defaultdict(list)
338
  maybe_edges = defaultdict(list)
339
  for m_nodes, m_edges in results:
@@ -341,18 +346,38 @@ async def extract_entities(
341
  maybe_nodes[k].extend(v)
342
  for k, v in m_edges.items():
343
  maybe_edges[tuple(sorted(k))].extend(v)
344
- all_entities_data = await asyncio.gather(
345
- *[
346
- _merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config)
347
- for k, v in maybe_nodes.items()
348
- ]
349
- )
350
- all_relationships_data = await asyncio.gather(
351
- *[
352
- _merge_edges_then_upsert(k[0], k[1], v, knowledge_graph_inst, global_config)
353
- for k, v in maybe_edges.items()
354
- ]
355
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
356
  if not len(all_entities_data):
357
  logger.warning("Didn't extract any entities, maybe your LLM is not working")
358
  return None
 
1
  import asyncio
2
  import json
3
  import re
4
+ from tqdm.asyncio import tqdm as tqdm_async
5
  from typing import Union
6
  from collections import Counter, defaultdict
7
  import warnings
 
330
  )
331
  return dict(maybe_nodes), dict(maybe_edges)
332
 
333
+ results = []
334
+ for result in tqdm_async(
335
+ asyncio.as_completed([_process_single_content(c) for c in ordered_chunks]),
336
+ total=len(ordered_chunks),
337
+ desc="Extracting entities from chunks",
338
+ unit="chunk",
339
+ ):
340
+ results.append(await result)
341
+
342
  maybe_nodes = defaultdict(list)
343
  maybe_edges = defaultdict(list)
344
  for m_nodes, m_edges in results:
 
346
  maybe_nodes[k].extend(v)
347
  for k, v in m_edges.items():
348
  maybe_edges[tuple(sorted(k))].extend(v)
349
+ logger.info("Inserting entities into storage...")
350
+ all_entities_data = []
351
+ for result in tqdm_async(
352
+ asyncio.as_completed(
353
+ [
354
+ _merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config)
355
+ for k, v in maybe_nodes.items()
356
+ ]
357
+ ),
358
+ total=len(maybe_nodes),
359
+ desc="Inserting entities",
360
+ unit="entity",
361
+ ):
362
+ all_entities_data.append(await result)
363
+
364
+ logger.info("Inserting relationships into storage...")
365
+ all_relationships_data = []
366
+ for result in tqdm_async(
367
+ asyncio.as_completed(
368
+ [
369
+ _merge_edges_then_upsert(
370
+ k[0], k[1], v, knowledge_graph_inst, global_config
371
+ )
372
+ for k, v in maybe_edges.items()
373
+ ]
374
+ ),
375
+ total=len(maybe_edges),
376
+ desc="Inserting relationships",
377
+ unit="relationship",
378
+ ):
379
+ all_relationships_data.append(await result)
380
+
381
  if not len(all_entities_data):
382
  logger.warning("Didn't extract any entities, maybe your LLM is not working")
383
  return None
lightrag/storage.py CHANGED
@@ -1,6 +1,7 @@
1
  import asyncio
2
  import html
3
  import os
 
4
  from dataclasses import dataclass
5
  from typing import Any, Union, cast
6
  import networkx as nx
@@ -95,9 +96,16 @@ class NanoVectorDBStorage(BaseVectorStorage):
95
  contents[i : i + self._max_batch_size]
96
  for i in range(0, len(contents), self._max_batch_size)
97
  ]
98
- embeddings_list = await asyncio.gather(
99
- *[self.embedding_func(batch) for batch in batches]
100
- )
 
 
 
 
 
 
 
101
  embeddings = np.concatenate(embeddings_list)
102
  for i, d in enumerate(list_data):
103
  d["__vector__"] = embeddings[i]
 
1
  import asyncio
2
  import html
3
  import os
4
+ from tqdm.asyncio import tqdm as tqdm_async
5
  from dataclasses import dataclass
6
  from typing import Any, Union, cast
7
  import networkx as nx
 
96
  contents[i : i + self._max_batch_size]
97
  for i in range(0, len(contents), self._max_batch_size)
98
  ]
99
+ embedding_tasks = [self.embedding_func(batch) for batch in batches]
100
+ embeddings_list = []
101
+ for f in tqdm_async(
102
+ asyncio.as_completed(embedding_tasks),
103
+ total=len(embedding_tasks),
104
+ desc="Generating embeddings",
105
+ unit="batch",
106
+ ):
107
+ embeddings = await f
108
+ embeddings_list.append(embeddings)
109
  embeddings = np.concatenate(embeddings_list)
110
  for i, d in enumerate(list_data):
111
  d["__vector__"] = embeddings[i]