yangdx commited on
Commit
7e30fcd
·
1 Parent(s): 50179f6

refactor: enhance stream error handling and optimize code structure

Browse files

- Initialize timestamps at start to avoid null checks
- Add detailed error handling for streaming response
- Handle CancelledError and other exceptions separately
- Unify exception handling with trace_exception
- Clean up redundant code and simplify logic

Files changed (1) hide show
  1. lightrag/api/ollama_api.py +68 -60
lightrag/api/ollama_api.py CHANGED
@@ -203,16 +203,16 @@ class OllamaAPI:
203
  )
204
 
205
  async def stream_generator():
 
 
 
 
206
  try:
207
- first_chunk_time = None
208
- last_chunk_time = None
209
- total_response = ""
210
 
211
  # Ensure response is an async generator
212
  if isinstance(response, str):
213
  # If it's a string, send in two parts
214
- first_chunk_time = time.time_ns()
215
- last_chunk_time = first_chunk_time
216
  total_response = response
217
 
218
  data = {
@@ -241,21 +241,48 @@ class OllamaAPI:
241
  }
242
  yield f"{json.dumps(data, ensure_ascii=False)}\n"
243
  else:
244
- async for chunk in response:
245
- if chunk:
246
- if first_chunk_time is None:
247
- first_chunk_time = time.time_ns()
248
-
249
- last_chunk_time = time.time_ns()
250
-
251
- total_response += chunk
252
- data = {
253
- "model": self.ollama_server_infos.LIGHTRAG_MODEL,
254
- "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
255
- "response": chunk,
256
- "done": False,
257
- }
258
- yield f"{json.dumps(data, ensure_ascii=False)}\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
259
 
260
  completion_tokens = estimate_tokens(total_response)
261
  total_time = last_chunk_time - start_time
@@ -381,16 +408,15 @@ class OllamaAPI:
381
  )
382
 
383
  async def stream_generator():
384
- first_chunk_time = None
385
- last_chunk_time = None
386
  total_response = ""
387
 
388
  try:
389
  # Ensure response is an async generator
390
  if isinstance(response, str):
391
  # If it's a string, send in two parts
392
- first_chunk_time = time.time_ns()
393
- last_chunk_time = first_chunk_time
394
  total_response = response
395
 
396
  data = {
@@ -474,45 +500,27 @@ class OllamaAPI:
474
  yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
475
  return
476
 
477
- if last_chunk_time is not None:
478
- completion_tokens = estimate_tokens(total_response)
479
- total_time = last_chunk_time - start_time
480
- prompt_eval_time = first_chunk_time - start_time
481
- eval_time = last_chunk_time - first_chunk_time
482
 
483
- data = {
484
- "model": self.ollama_server_infos.LIGHTRAG_MODEL,
485
- "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
486
- "done": True,
487
- "total_duration": total_time,
488
- "load_duration": 0,
489
- "prompt_eval_count": prompt_tokens,
490
- "prompt_eval_duration": prompt_eval_time,
491
- "eval_count": completion_tokens,
492
- "eval_duration": eval_time,
493
- }
494
- yield f"{json.dumps(data, ensure_ascii=False)}\n"
495
 
496
  except Exception as e:
497
- error_msg = f"Error in stream_generator: {str(e)}"
498
- logging.error(error_msg)
499
-
500
- # Send error message to client
501
- error_data = {
502
- "model": self.ollama_server_infos.LIGHTRAG_MODEL,
503
- "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
504
- "error": {"code": "STREAM_ERROR", "message": error_msg},
505
- }
506
- yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
507
-
508
- # Ensure sending end marker
509
- final_data = {
510
- "model": self.ollama_server_infos.LIGHTRAG_MODEL,
511
- "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
512
- "done": True,
513
- }
514
- yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
515
- return
516
 
517
  return StreamingResponse(
518
  stream_generator(),
 
203
  )
204
 
205
  async def stream_generator():
206
+ first_chunk_time = time.time_ns()
207
+ last_chunk_time = first_chunk_time
208
+ total_response = ""
209
+
210
  try:
 
 
 
211
 
212
  # Ensure response is an async generator
213
  if isinstance(response, str):
214
  # If it's a string, send in two parts
215
+ last_chunk_time = time.time_ns()
 
216
  total_response = response
217
 
218
  data = {
 
241
  }
242
  yield f"{json.dumps(data, ensure_ascii=False)}\n"
243
  else:
244
+ try:
245
+ async for chunk in response:
246
+ if chunk:
247
+ if first_chunk_time is None:
248
+ first_chunk_time = time.time_ns()
249
+
250
+ last_chunk_time = time.time_ns()
251
+
252
+ total_response += chunk
253
+ data = {
254
+ "model": self.ollama_server_infos.LIGHTRAG_MODEL,
255
+ "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
256
+ "response": chunk,
257
+ "done": False,
258
+ }
259
+ yield f"{json.dumps(data, ensure_ascii=False)}\n"
260
+ except (asyncio.CancelledError, Exception) as e:
261
+ error_msg = str(e)
262
+ if isinstance(e, asyncio.CancelledError):
263
+ error_msg = "Stream was cancelled by server"
264
+ else:
265
+ error_msg = f"Provider error: {error_msg}"
266
+
267
+ logging.error(f"Stream error: {error_msg}")
268
+
269
+ # Send error message to client
270
+ error_data = {
271
+ "model": self.ollama_server_infos.LIGHTRAG_MODEL,
272
+ "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
273
+ "response": f"\n\nError: {error_msg}",
274
+ "done": False,
275
+ }
276
+ yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
277
+
278
+ # Send final message to close the stream
279
+ final_data = {
280
+ "model": self.ollama_server_infos.LIGHTRAG_MODEL,
281
+ "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
282
+ "done": True,
283
+ }
284
+ yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
285
+ return
286
 
287
  completion_tokens = estimate_tokens(total_response)
288
  total_time = last_chunk_time - start_time
 
408
  )
409
 
410
  async def stream_generator():
411
+ first_chunk_time = time.time_ns()
412
+ last_chunk_time = first_chunk_time
413
  total_response = ""
414
 
415
  try:
416
  # Ensure response is an async generator
417
  if isinstance(response, str):
418
  # If it's a string, send in two parts
419
+ last_chunk_time = time.time_ns()
 
420
  total_response = response
421
 
422
  data = {
 
500
  yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
501
  return
502
 
503
+ completion_tokens = estimate_tokens(total_response)
504
+ total_time = last_chunk_time - start_time
505
+ prompt_eval_time = first_chunk_time - start_time
506
+ eval_time = last_chunk_time - first_chunk_time
 
507
 
508
+ data = {
509
+ "model": self.ollama_server_infos.LIGHTRAG_MODEL,
510
+ "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
511
+ "done": True,
512
+ "total_duration": total_time,
513
+ "load_duration": 0,
514
+ "prompt_eval_count": prompt_tokens,
515
+ "prompt_eval_duration": prompt_eval_time,
516
+ "eval_count": completion_tokens,
517
+ "eval_duration": eval_time,
518
+ }
519
+ yield f"{json.dumps(data, ensure_ascii=False)}\n"
520
 
521
  except Exception as e:
522
+ trace_exception(e)
523
+ raise
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
524
 
525
  return StreamingResponse(
526
  stream_generator(),