zrguo commited on
Commit
79d6809
·
unverified ·
2 Parent(s): c4f9db4 c63b233

Merge pull request #712 from danielaskdd/handle-stream-cancel-error

Browse files
lightrag/api/lightrag_server.py CHANGED
@@ -12,7 +12,7 @@ from fastapi import (
12
  # Add this to store progress globally
13
  from typing import Dict
14
  import threading
15
-
16
  import json
17
  import os
18
 
@@ -1725,11 +1725,11 @@ def create_app(args):
1725
  )
1726
 
1727
  async def stream_generator():
1728
- try:
1729
- first_chunk_time = None
1730
- last_chunk_time = None
1731
- total_response = ""
1732
 
 
1733
  # Ensure response is an async generator
1734
  if isinstance(response, str):
1735
  # If it's a string, send in two parts
@@ -1767,47 +1767,96 @@ def create_app(args):
1767
  }
1768
  yield f"{json.dumps(data, ensure_ascii=False)}\n"
1769
  else:
1770
- async for chunk in response:
1771
- if chunk:
1772
- if first_chunk_time is None:
1773
- first_chunk_time = time.time_ns()
1774
-
1775
- last_chunk_time = time.time_ns()
1776
-
1777
- total_response += chunk
1778
- data = {
1779
- "model": ollama_server_infos.LIGHTRAG_MODEL,
1780
- "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
1781
- "message": {
1782
- "role": "assistant",
1783
- "content": chunk,
1784
- "images": None,
1785
- },
1786
- "done": False,
1787
- }
1788
- yield f"{json.dumps(data, ensure_ascii=False)}\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1789
 
1790
- completion_tokens = estimate_tokens(total_response)
1791
- total_time = last_chunk_time - start_time
1792
- prompt_eval_time = first_chunk_time - start_time
1793
- eval_time = last_chunk_time - first_chunk_time
1794
-
1795
- data = {
1796
- "model": ollama_server_infos.LIGHTRAG_MODEL,
1797
- "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
1798
- "done": True,
1799
- "total_duration": total_time,
1800
- "load_duration": 0,
1801
- "prompt_eval_count": prompt_tokens,
1802
- "prompt_eval_duration": prompt_eval_time,
1803
- "eval_count": completion_tokens,
1804
- "eval_duration": eval_time,
1805
- }
1806
- yield f"{json.dumps(data, ensure_ascii=False)}\n"
1807
- return # Ensure the generator ends immediately after sending the completion marker
1808
  except Exception as e:
1809
- logging.error(f"Error in stream_generator: {str(e)}")
1810
- raise
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1811
 
1812
  return StreamingResponse(
1813
  stream_generator(),
 
12
  # Add this to store progress globally
13
  from typing import Dict
14
  import threading
15
+ import asyncio
16
  import json
17
  import os
18
 
 
1725
  )
1726
 
1727
  async def stream_generator():
1728
+ first_chunk_time = None
1729
+ last_chunk_time = None
1730
+ total_response = ""
 
1731
 
1732
+ try:
1733
  # Ensure response is an async generator
1734
  if isinstance(response, str):
1735
  # If it's a string, send in two parts
 
1767
  }
1768
  yield f"{json.dumps(data, ensure_ascii=False)}\n"
1769
  else:
1770
+ try:
1771
+ async for chunk in response:
1772
+ if chunk:
1773
+ if first_chunk_time is None:
1774
+ first_chunk_time = time.time_ns()
1775
+
1776
+ last_chunk_time = time.time_ns()
1777
+
1778
+ total_response += chunk
1779
+ data = {
1780
+ "model": ollama_server_infos.LIGHTRAG_MODEL,
1781
+ "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
1782
+ "message": {
1783
+ "role": "assistant",
1784
+ "content": chunk,
1785
+ "images": None,
1786
+ },
1787
+ "done": False,
1788
+ }
1789
+ yield f"{json.dumps(data, ensure_ascii=False)}\n"
1790
+ except (asyncio.CancelledError, Exception) as e:
1791
+ error_msg = str(e)
1792
+ if isinstance(e, asyncio.CancelledError):
1793
+ error_msg = "Stream was cancelled by server"
1794
+ else:
1795
+ error_msg = f"Provider error: {error_msg}"
1796
+
1797
+ logging.error(f"Stream error: {error_msg}")
1798
+
1799
+ # Send error message to client
1800
+ error_data = {
1801
+ "model": ollama_server_infos.LIGHTRAG_MODEL,
1802
+ "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
1803
+ "message": {
1804
+ "role": "assistant",
1805
+ "content": f"\n\nError: {error_msg}",
1806
+ "images": None,
1807
+ },
1808
+ "done": False,
1809
+ }
1810
+ yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
1811
+
1812
+ # Send final message to close the stream
1813
+ final_data = {
1814
+ "model": ollama_server_infos.LIGHTRAG_MODEL,
1815
+ "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
1816
+ "done": True,
1817
+ }
1818
+ yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
1819
+ return
1820
+
1821
+ if last_chunk_time is not None:
1822
+ completion_tokens = estimate_tokens(total_response)
1823
+ total_time = last_chunk_time - start_time
1824
+ prompt_eval_time = first_chunk_time - start_time
1825
+ eval_time = last_chunk_time - first_chunk_time
1826
+
1827
+ data = {
1828
+ "model": ollama_server_infos.LIGHTRAG_MODEL,
1829
+ "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
1830
+ "done": True,
1831
+ "total_duration": total_time,
1832
+ "load_duration": 0,
1833
+ "prompt_eval_count": prompt_tokens,
1834
+ "prompt_eval_duration": prompt_eval_time,
1835
+ "eval_count": completion_tokens,
1836
+ "eval_duration": eval_time,
1837
+ }
1838
+ yield f"{json.dumps(data, ensure_ascii=False)}\n"
1839
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1840
  except Exception as e:
1841
+ error_msg = f"Error in stream_generator: {str(e)}"
1842
+ logging.error(error_msg)
1843
+
1844
+ # Send error message to client
1845
+ error_data = {
1846
+ "model": ollama_server_infos.LIGHTRAG_MODEL,
1847
+ "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
1848
+ "error": {"code": "STREAM_ERROR", "message": error_msg},
1849
+ }
1850
+ yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
1851
+
1852
+ # Ensure sending end marker
1853
+ final_data = {
1854
+ "model": ollama_server_infos.LIGHTRAG_MODEL,
1855
+ "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
1856
+ "done": True,
1857
+ }
1858
+ yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
1859
+ return
1860
 
1861
  return StreamingResponse(
1862
  stream_generator(),
lightrag/llm/openai.py CHANGED
@@ -125,13 +125,17 @@ async def openai_complete_if_cache(
125
  if hasattr(response, "__aiter__"):
126
 
127
  async def inner():
128
- async for chunk in response:
129
- content = chunk.choices[0].delta.content
130
- if content is None:
131
- continue
132
- if r"\u" in content:
133
- content = safe_unicode_decode(content.encode("utf-8"))
134
- yield content
 
 
 
 
135
 
136
  return inner()
137
  else:
 
125
  if hasattr(response, "__aiter__"):
126
 
127
  async def inner():
128
+ try:
129
+ async for chunk in response:
130
+ content = chunk.choices[0].delta.content
131
+ if content is None:
132
+ continue
133
+ if r"\u" in content:
134
+ content = safe_unicode_decode(content.encode("utf-8"))
135
+ yield content
136
+ except Exception as e:
137
+ logger.error(f"Error in stream response: {str(e)}")
138
+ raise
139
 
140
  return inner()
141
  else: