ccm commited on
Commit
cdca445
·
1 Parent(s): a6d1b6d

Reverting logging changes, updating agent.

Browse files
agent_server/agent_streaming.py CHANGED
@@ -1,91 +1,19 @@
1
- # agent_server/agent_streaming.py
2
- from __future__ import annotations
3
-
4
  import asyncio
5
  import contextlib
6
  import os
7
  import threading
8
  import time
9
- import typing as t
10
 
11
  import fastapi
12
  import httpx
13
 
14
  from agent_server.helpers import sse_headers
15
  from agent_server.sanitizing_think_tags import scrub_think_tags
16
- from agent_server.std_tee import (
17
- QueueWriter,
18
- _serialize_step,
19
- _format_reasoning_chunk,
20
- _maybe_parse_final_from_stdout,
21
- )
22
-
23
- # ---------------------------------------------------------------------------
24
- # Memory poller: normalize all agent types to uniform step blocks.
25
- # ---------------------------------------------------------------------------
26
- def start_memory_poller(
27
- agent: t.Any,
28
- q: "asyncio.Queue[dict]",
29
- stop_evt: "threading.Event",
30
- interval: float = 0.10,
31
- ) -> threading.Thread:
32
- """
33
- Starts a background thread that polls agent memory and enqueues formatted step blocks.
34
- Tries several attribute paths to support different agent implementations.
35
- """
36
- last_len = 0
37
-
38
- def _get_steps_safe() -> list:
39
- # Try canonical memory APIs first
40
- try:
41
- mem = getattr(agent, "memory", None)
42
- if mem is not None:
43
- for attr in ("get_full_steps", "get_steps", "get_all_steps"):
44
- fn = getattr(mem, attr, None)
45
- if callable(fn):
46
- steps = fn()
47
- return list(steps or [])
48
- except Exception:
49
- pass
50
- # Fallback: common direct list field
51
- try:
52
- raw = getattr(agent, "steps", None)
53
- if raw:
54
- return list(raw)
55
- except Exception:
56
- pass
57
- return []
58
-
59
- def _run():
60
- nonlocal last_len
61
- while not stop_evt.is_set():
62
- try:
63
- steps = _get_steps_safe()
64
- if steps and len(steps) > last_len:
65
- new = steps[last_len:]
66
- last_len = len(steps)
67
- for s in new:
68
- try:
69
- s_text = _serialize_step(s)
70
- if s_text:
71
- q.put_nowait({"__step__": s_text})
72
- except Exception:
73
- # Never let formatting kill polling
74
- pass
75
- except Exception:
76
- pass
77
- time.sleep(interval)
78
 
79
- th = threading.Thread(target=_run, name="memory-poller", daemon=True)
80
- th.start()
81
- return th
82
 
83
-
84
- # ---------------------------------------------------------------------------
85
- # Unified agent streaming: stdout/stderr, memory steps, iterator yields.
86
- # Adds normalized reasoning via __reasoning__ while preserving legacy keys.
87
- # ---------------------------------------------------------------------------
88
- async def run_agent_stream(task: str, agent_obj: t.Optional[t.Any] = None):
89
  """
90
  Start the agent in a worker thread.
91
  Stream THREE sources of incremental data into the async generator:
@@ -93,62 +21,97 @@ async def run_agent_stream(task: str, agent_obj: t.Optional[t.Any] = None):
93
  (2) newly appended memory steps (polled),
94
  (3) any iterable the agent may yield (if supported).
95
  Finally emit a __final__ item with the last answer.
96
-
97
- Emits dict items. For compatibility, raw shapes are preserved:
98
- - {"__stdout__": "<line>"} (raw line)
99
- - {"__step__": "<block>"} (uniform Step/Thought/Tool/Args/Obs/Error block)
100
- - {"__reasoning__": "<chunk>"} (normalized reasoning derived from stdout)
101
- - {"__error__": "<error>"} (if run errors)
102
- - {"__final__": any} (final result)
103
  """
104
  loop = asyncio.get_running_loop()
105
  q: asyncio.Queue = asyncio.Queue()
106
  agent_to_use = agent_obj
 
107
  stop_evt = threading.Event()
108
 
109
- # 1) stdout/stderr live tee (lines go in as {"__stdout__": ...})
110
  qwriter = QueueWriter(q)
111
 
112
  # 2) memory poller
113
- mem_thread = start_memory_poller(agent_to_use, q, stop_evt)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
 
115
  # 3) agent runner (may or may not yield)
116
  def run_agent():
117
  final_result = None
118
  try:
119
- with contextlib.redirect_stdout(qwriter), contextlib.redirect_stderr(qwriter):
 
 
120
  used_iterable = False
121
- # Preferred streaming signature
122
- if hasattr(agent_to_use, "run") and callable(getattr(agent_to_use, "run")):
 
123
  try:
124
  res = agent_to_use.run(task, stream=True)
125
- if hasattr(res, "__iter__") and not isinstance(res, (str, bytes)):
 
 
126
  used_iterable = True
127
  for it in res:
128
  try:
129
- q.put_nowait(it if isinstance(it, dict) else {"__stdout__": str(it)})
130
  except Exception:
131
  pass
132
- final_result = None # may be contained in the iterable
 
 
133
  else:
134
  final_result = res
135
  except TypeError:
136
- # run(stream=True) not supported -> fall through to other signatures
137
  pass
138
 
139
  if final_result is None and not used_iterable:
140
- # Try other common streaming variants
141
- for name in ("run_stream", "stream", "stream_run", "run_with_callback"):
 
 
 
 
 
142
  fn = getattr(agent_to_use, name, None)
143
  if callable(fn):
144
  try:
145
  res = fn(task)
146
- if hasattr(res, "__iter__") and not isinstance(res, (str, bytes)):
 
 
147
  for it in res:
148
- try:
149
- q.put_nowait(it if isinstance(it, dict) else {"__stdout__": str(it)})
150
- except Exception:
151
- pass
152
  final_result = None
153
  else:
154
  final_result = res
@@ -157,7 +120,7 @@ async def run_agent_stream(task: str, agent_obj: t.Optional[t.Any] = None):
157
  # maybe callback signature
158
  def cb(item):
159
  try:
160
- q.put_nowait(item if isinstance(item, dict) else {"__stdout__": str(item)})
161
  except Exception:
162
  pass
163
 
@@ -169,10 +132,17 @@ async def run_agent_stream(task: str, agent_obj: t.Optional[t.Any] = None):
169
  continue
170
 
171
  if final_result is None and not used_iterable:
172
- # Last resort: synchronous APIs
173
- if hasattr(agent_to_use, "run") and callable(getattr(agent_to_use, "run")):
 
 
 
 
 
174
  final_result = agent_to_use.run(task)
175
- elif hasattr(agent_to_use, "generate") and callable(getattr(agent_to_use, "generate")):
 
 
176
  final_result = agent_to_use.generate(task)
177
  elif callable(agent_to_use):
178
  final_result = agent_to_use(task)
@@ -197,45 +167,20 @@ async def run_agent_stream(task: str, agent_obj: t.Optional[t.Any] = None):
197
  pass
198
  stop_evt.set()
199
 
200
- run_thread = threading.Thread(target=run_agent, name="agent-runner", daemon=True)
 
 
 
201
  run_thread.start()
202
 
203
- # Async consumer: normalize stdout -> reasoning chunk; forward steps & others
204
  while True:
205
  item = await q.get()
206
-
207
- # Normalize stdout lines into compact reasoning chunks, and also
208
- # opportunistically extract a "Final answer:" if the agent prints one.
209
- if isinstance(item, dict) and "__stdout__" in item:
210
- line = item["__stdout__"]
211
- # Add compact, filtered reasoning chunk (drop banners, system prompts)
212
- chunk = _format_reasoning_chunk(line, tag="stdout", idx=0)
213
- if chunk:
214
- yield {"__reasoning__": chunk}
215
- # Keep legacy raw stdout for existing consumers
216
- yield item
217
- # Opportunistic final answer capture from stdout
218
- maybe_final = _maybe_parse_final_from_stdout(line)
219
- if maybe_final:
220
- # Don't end the stream here; consumer can decide how to use it
221
- yield {"__maybe_final__": maybe_final}
222
- continue
223
-
224
- # Steps already serialized uniformly in the poller
225
- if isinstance(item, dict) and "__step__" in item:
226
- yield item
227
- continue
228
-
229
- # Pass-through for other shapes (__error__, iterable events, etc.)
230
  yield item
231
-
232
  if isinstance(item, dict) and "__final__" in item:
233
  break
234
 
235
 
236
- # ---------------------------------------------------------------------------
237
- # Utilities: scrub nested structures of <think> tags when proxying upstream
238
- # ---------------------------------------------------------------------------
239
  def _recursively_scrub(obj):
240
  if isinstance(obj, str):
241
  return scrub_think_tags(obj)
@@ -246,10 +191,9 @@ def _recursively_scrub(obj):
246
  return obj
247
 
248
 
249
- # ---------------------------------------------------------------------------
250
- # Upstream proxy (OpenAI-compatible) with optional think-tag scrubbing
251
- # ---------------------------------------------------------------------------
252
- async def proxy_upstream_chat_completions(body: dict, stream: bool, scrub_think: bool = False):
253
  HF_TOKEN = os.getenv("OPENAI_API_KEY")
254
  headers = {
255
  "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "",
@@ -262,9 +206,12 @@ async def proxy_upstream_chat_completions(body: dict, stream: bool, scrub_think:
262
 
263
  async def proxy_stream():
264
  async with httpx.AsyncClient(timeout=None) as client:
265
- async with client.stream("POST", url, headers=headers, json=body) as resp:
 
 
266
  resp.raise_for_status()
267
  if scrub_think:
 
268
  async for txt in resp.aiter_text():
269
  try:
270
  cleaned = scrub_think_tags(txt)
@@ -292,250 +239,6 @@ async def proxy_upstream_chat_completions(body: dict, stream: bool, scrub_think:
292
  except Exception:
293
  pass
294
 
295
- return fastapi.responses.JSONResponse(status_code=r.status_code, content=payload)
296
-
297
-
298
- # import asyncio
299
- # import contextlib
300
- # import os
301
- # import threading
302
- # import time
303
- # import typing
304
- #
305
- # import fastapi
306
- # import httpx
307
- #
308
- # from agent_server.helpers import sse_headers
309
- # from agent_server.sanitizing_think_tags import scrub_think_tags
310
- # from agent_server.std_tee import QueueWriter, _serialize_step
311
- #
312
- #
313
- # async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = None):
314
- # """
315
- # Start the agent in a worker thread.
316
- # Stream THREE sources of incremental data into the async generator:
317
- # (1) live stdout/stderr lines,
318
- # (2) newly appended memory steps (polled),
319
- # (3) any iterable the agent may yield (if supported).
320
- # Finally emit a __final__ item with the last answer.
321
- # """
322
- # loop = asyncio.get_running_loop()
323
- # q: asyncio.Queue = asyncio.Queue()
324
- # agent_to_use = agent_obj
325
- #
326
- # stop_evt = threading.Event()
327
- #
328
- # # 1) stdout/stderr live tee
329
- # qwriter = QueueWriter(q)
330
- #
331
- # # 2) memory poller
332
- # def poll_memory():
333
- # last_len = 0
334
- # while not stop_evt.is_set():
335
- # try:
336
- # steps = []
337
- # try:
338
- # # Common API: agent.memory.get_full_steps()
339
- # steps = agent_to_use.memory.get_full_steps() # type: ignore[attr-defined]
340
- # except Exception:
341
- # # Fallbacks: different names across versions
342
- # steps = (
343
- # getattr(agent_to_use, "steps", [])
344
- # or getattr(agent_to_use, "memory", [])
345
- # or []
346
- # )
347
- # if steps is None:
348
- # steps = []
349
- # curr_len = len(steps)
350
- # if curr_len > last_len:
351
- # new = steps[last_len:curr_len]
352
- # last_len = curr_len
353
- # for s in new:
354
- # s_text = _serialize_step(s)
355
- # if s_text:
356
- # try:
357
- # q.put_nowait({"__step__": s_text})
358
- # except Exception:
359
- # pass
360
- # except Exception:
361
- # pass
362
- # time.sleep(0.10) # 100 ms cadence
363
- #
364
- # # 3) agent runner (may or may not yield)
365
- # def run_agent():
366
- # final_result = None
367
- # try:
368
- # with contextlib.redirect_stdout(qwriter), contextlib.redirect_stderr(
369
- # qwriter
370
- # ):
371
- # used_iterable = False
372
- # if hasattr(agent_to_use, "run") and callable(
373
- # getattr(agent_to_use, "run")
374
- # ):
375
- # try:
376
- # res = agent_to_use.run(task, stream=True)
377
- # if hasattr(res, "__iter__") and not isinstance(
378
- # res, (str, bytes)
379
- # ):
380
- # used_iterable = True
381
- # for it in res:
382
- # try:
383
- # q.put_nowait(it)
384
- # except Exception:
385
- # pass
386
- # final_result = (
387
- # None # iterable may already contain the answer
388
- # )
389
- # else:
390
- # final_result = res
391
- # except TypeError:
392
- # # run(stream=True) not supported -> fall back
393
- # pass
394
- #
395
- # if final_result is None and not used_iterable:
396
- # # Try other common streaming signatures
397
- # for name in (
398
- # "run_stream",
399
- # "stream",
400
- # "stream_run",
401
- # "run_with_callback",
402
- # ):
403
- # fn = getattr(agent_to_use, name, None)
404
- # if callable(fn):
405
- # try:
406
- # res = fn(task)
407
- # if hasattr(res, "__iter__") and not isinstance(
408
- # res, (str, bytes)
409
- # ):
410
- # for it in res:
411
- # q.put_nowait(it)
412
- # final_result = None
413
- # else:
414
- # final_result = res
415
- # break
416
- # except TypeError:
417
- # # maybe callback signature
418
- # def cb(item):
419
- # try:
420
- # q.put_nowait(item)
421
- # except Exception:
422
- # pass
423
- #
424
- # try:
425
- # fn(task, cb)
426
- # final_result = None
427
- # break
428
- # except Exception:
429
- # continue
430
- #
431
- # if final_result is None and not used_iterable:
432
- # pass # (typo guard removed below)
433
- #
434
- # if final_result is None and not used_iterable:
435
- # # Last resort: synchronous run()/generate()/callable
436
- # if hasattr(agent_to_use, "run") and callable(
437
- # getattr(agent_to_use, "run")
438
- # ):
439
- # final_result = agent_to_use.run(task)
440
- # elif hasattr(agent_to_use, "generate") and callable(
441
- # getattr(agent_to_use, "generate")
442
- # ):
443
- # final_result = agent_to_use.generate(task)
444
- # elif callable(agent_to_use):
445
- # final_result = agent_to_use(task)
446
- #
447
- # except Exception as e:
448
- # try:
449
- # qwriter.flush()
450
- # except Exception:
451
- # pass
452
- # try:
453
- # q.put_nowait({"__error__": str(e)})
454
- # except Exception:
455
- # pass
456
- # finally:
457
- # try:
458
- # qwriter.flush()
459
- # except Exception:
460
- # pass
461
- # try:
462
- # q.put_nowait({"__final__": final_result})
463
- # except Exception:
464
- # pass
465
- # stop_evt.set()
466
- #
467
- # # Kick off threads
468
- # mem_thread = threading.Thread(target=poll_memory, daemon=True)
469
- # run_thread = threading.Thread(target=run_agent, daemon=True)
470
- # mem_thread.start()
471
- # run_thread.start()
472
- #
473
- # # Async consumer
474
- # while True:
475
- # item = await q.get()
476
- # yield item
477
- # if isinstance(item, dict) and "__final__" in item:
478
- # break
479
- #
480
- #
481
- # def _recursively_scrub(obj):
482
- # if isinstance(obj, str):
483
- # return scrub_think_tags(obj)
484
- # if isinstance(obj, dict):
485
- # return {k: _recursively_scrub(v) for k, v in obj.items()}
486
- # if isinstance(obj, list):
487
- # return [_recursively_scrub(v) for v in obj]
488
- # return obj
489
- #
490
- #
491
- # async def proxy_upstream_chat_completions(
492
- # body: dict, stream: bool, scrub_think: bool = False
493
- # ):
494
- # HF_TOKEN = os.getenv("OPENAI_API_KEY")
495
- # headers = {
496
- # "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "",
497
- # "Content-Type": "application/json",
498
- # }
499
- # UPSTREAM_BASE = os.getenv("UPSTREAM_OPENAI_BASE", "").rstrip("/")
500
- # url = f"{UPSTREAM_BASE}/chat/completions"
501
- #
502
- # if stream:
503
- #
504
- # async def proxy_stream():
505
- # async with httpx.AsyncClient(timeout=None) as client:
506
- # async with client.stream(
507
- # "POST", url, headers=headers, json=body
508
- # ) as resp:
509
- # resp.raise_for_status()
510
- # if scrub_think:
511
- # # Pull text segments, scrub tags, and yield bytes
512
- # async for txt in resp.aiter_text():
513
- # try:
514
- # cleaned = scrub_think_tags(txt)
515
- # yield cleaned.encode("utf-8")
516
- # except Exception:
517
- # yield txt.encode("utf-8")
518
- # else:
519
- # async for chunk in resp.aiter_bytes():
520
- # yield chunk
521
- #
522
- # return fastapi.responses.StreamingResponse(
523
- # proxy_stream(), media_type="text/event-stream", headers=sse_headers()
524
- # )
525
- # else:
526
- # async with httpx.AsyncClient(timeout=None) as client:
527
- # r = await client.post(url, headers=headers, json=body)
528
- # try:
529
- # payload = r.json()
530
- # except Exception:
531
- # payload = {"status_code": r.status_code, "text": r.text}
532
- #
533
- # if scrub_think:
534
- # try:
535
- # payload = _recursively_scrub(payload)
536
- # except Exception:
537
- # pass
538
- #
539
- # return fastapi.responses.JSONResponse(
540
- # status_code=r.status_code, content=payload
541
- # )
 
 
 
 
1
  import asyncio
2
  import contextlib
3
  import os
4
  import threading
5
  import time
6
+ import typing
7
 
8
  import fastapi
9
  import httpx
10
 
11
  from agent_server.helpers import sse_headers
12
  from agent_server.sanitizing_think_tags import scrub_think_tags
13
+ from agent_server.std_tee import QueueWriter, _serialize_step
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
 
 
 
15
 
16
+ async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = None):
 
 
 
 
 
17
  """
18
  Start the agent in a worker thread.
19
  Stream THREE sources of incremental data into the async generator:
 
21
  (2) newly appended memory steps (polled),
22
  (3) any iterable the agent may yield (if supported).
23
  Finally emit a __final__ item with the last answer.
 
 
 
 
 
 
 
24
  """
25
  loop = asyncio.get_running_loop()
26
  q: asyncio.Queue = asyncio.Queue()
27
  agent_to_use = agent_obj
28
+
29
  stop_evt = threading.Event()
30
 
31
+ # 1) stdout/stderr live tee
32
  qwriter = QueueWriter(q)
33
 
34
  # 2) memory poller
35
+ def poll_memory():
36
+ last_len = 0
37
+ while not stop_evt.is_set():
38
+ try:
39
+ steps = []
40
+ try:
41
+ # Common API: agent.memory.get_full_steps()
42
+ steps = agent_to_use.memory.get_full_steps() # type: ignore[attr-defined]
43
+ except Exception:
44
+ # Fallbacks: different names across versions
45
+ steps = (
46
+ getattr(agent_to_use, "steps", [])
47
+ or getattr(agent_to_use, "memory", [])
48
+ or []
49
+ )
50
+ if steps is None:
51
+ steps = []
52
+ curr_len = len(steps)
53
+ if curr_len > last_len:
54
+ new = steps[last_len:curr_len]
55
+ last_len = curr_len
56
+ for s in new:
57
+ s_text = _serialize_step(s)
58
+ if s_text:
59
+ try:
60
+ q.put_nowait({"__step__": s_text})
61
+ except Exception:
62
+ pass
63
+ except Exception:
64
+ pass
65
+ time.sleep(0.10) # 100 ms cadence
66
 
67
  # 3) agent runner (may or may not yield)
68
  def run_agent():
69
  final_result = None
70
  try:
71
+ with contextlib.redirect_stdout(qwriter), contextlib.redirect_stderr(
72
+ qwriter
73
+ ):
74
  used_iterable = False
75
+ if hasattr(agent_to_use, "run") and callable(
76
+ getattr(agent_to_use, "run")
77
+ ):
78
  try:
79
  res = agent_to_use.run(task, stream=True)
80
+ if hasattr(res, "__iter__") and not isinstance(
81
+ res, (str, bytes)
82
+ ):
83
  used_iterable = True
84
  for it in res:
85
  try:
86
+ q.put_nowait(it)
87
  except Exception:
88
  pass
89
+ final_result = (
90
+ None # iterable may already contain the answer
91
+ )
92
  else:
93
  final_result = res
94
  except TypeError:
95
+ # run(stream=True) not supported -> fall back
96
  pass
97
 
98
  if final_result is None and not used_iterable:
99
+ # Try other common streaming signatures
100
+ for name in (
101
+ "run_stream",
102
+ "stream",
103
+ "stream_run",
104
+ "run_with_callback",
105
+ ):
106
  fn = getattr(agent_to_use, name, None)
107
  if callable(fn):
108
  try:
109
  res = fn(task)
110
+ if hasattr(res, "__iter__") and not isinstance(
111
+ res, (str, bytes)
112
+ ):
113
  for it in res:
114
+ q.put_nowait(it)
 
 
 
115
  final_result = None
116
  else:
117
  final_result = res
 
120
  # maybe callback signature
121
  def cb(item):
122
  try:
123
+ q.put_nowait(item)
124
  except Exception:
125
  pass
126
 
 
132
  continue
133
 
134
  if final_result is None and not used_iterable:
135
+ pass # (typo guard removed below)
136
+
137
+ if final_result is None and not used_iterable:
138
+ # Last resort: synchronous run()/generate()/callable
139
+ if hasattr(agent_to_use, "run") and callable(
140
+ getattr(agent_to_use, "run")
141
+ ):
142
  final_result = agent_to_use.run(task)
143
+ elif hasattr(agent_to_use, "generate") and callable(
144
+ getattr(agent_to_use, "generate")
145
+ ):
146
  final_result = agent_to_use.generate(task)
147
  elif callable(agent_to_use):
148
  final_result = agent_to_use(task)
 
167
  pass
168
  stop_evt.set()
169
 
170
+ # Kick off threads
171
+ mem_thread = threading.Thread(target=poll_memory, daemon=True)
172
+ run_thread = threading.Thread(target=run_agent, daemon=True)
173
+ mem_thread.start()
174
  run_thread.start()
175
 
176
+ # Async consumer
177
  while True:
178
  item = await q.get()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
179
  yield item
 
180
  if isinstance(item, dict) and "__final__" in item:
181
  break
182
 
183
 
 
 
 
184
  def _recursively_scrub(obj):
185
  if isinstance(obj, str):
186
  return scrub_think_tags(obj)
 
191
  return obj
192
 
193
 
194
+ async def proxy_upstream_chat_completions(
195
+ body: dict, stream: bool, scrub_think: bool = False
196
+ ):
 
197
  HF_TOKEN = os.getenv("OPENAI_API_KEY")
198
  headers = {
199
  "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "",
 
206
 
207
  async def proxy_stream():
208
  async with httpx.AsyncClient(timeout=None) as client:
209
+ async with client.stream(
210
+ "POST", url, headers=headers, json=body
211
+ ) as resp:
212
  resp.raise_for_status()
213
  if scrub_think:
214
+ # Pull text segments, scrub tags, and yield bytes
215
  async for txt in resp.aiter_text():
216
  try:
217
  cleaned = scrub_think_tags(txt)
 
239
  except Exception:
240
  pass
241
 
242
+ return fastapi.responses.JSONResponse(
243
+ status_code=r.status_code, content=payload
244
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
agent_server/std_tee.py CHANGED
@@ -1,305 +1,102 @@
1
- # agent_server/std_tee.py
2
- from __future__ import annotations
3
-
4
  import asyncio
5
  import io
6
  import json
7
  import re
8
  import threading
9
- import typing as t
10
-
11
- # ---- Think-tag scrubber (import with safe fallback) -------------------------
12
- try:
13
- # Same-package relative import is preferred
14
- from .sanitizing_think_tags import scrub_think_tags # type: ignore
15
- except Exception: # pragma: no cover
16
- # No-op fallback if the project layout differs
17
- def scrub_think_tags(s: str) -> str:
18
- return s
19
-
20
- # ---- Formatting helpers (ANSI, noise, truncation) --------------------------
21
- _ANSI_RE = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
22
 
23
- # Lines that should never be surfaced (system prompt and obvious boilerplate)
24
- _NOISY_PREFIXES = (
25
- "OpenAIServerModel",
26
- "Output message of the LLM",
27
- "New run",
28
- "─ Executing parsed code",
29
- "╭", "╰", "│", "━", "─",
30
- "System prompt", "SYSTEM PROMPT", "System Prompt",
31
- )
32
 
33
- # Very long single lines without enough alphanumerics are dropped
34
- _MIN_SIG_CHARS = re.compile(r"[A-Za-z0-9]{3,}")
35
 
36
- def _strip_ansi_and_think(s: str) -> str:
37
- s = scrub_think_tags(s)
38
- s = _ANSI_RE.sub("", s)
39
- return s.strip()
40
-
41
- def _truncate(s: str, n: int) -> str:
42
- s = s.strip()
43
- if len(s) <= n:
44
- return s
45
- return s[:n] + "\n… [truncated]"
46
-
47
- def _clean_line(s: str) -> str:
48
- return _strip_ansi_and_think(s).rstrip("\n")
49
 
50
- # ---- Public-ish helpers used by agent_streaming ----------------------------
51
- _FINAL_RE = re.compile(r"(?:^|\\b)Final\\s+answer:\\s*(.+)$", flags=re.IGNORECASE)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
 
53
- def _maybe_parse_final_from_stdout(line: str) -> t.Optional[str]:
54
- if not isinstance(line, str):
55
- return None
56
- line = _clean_line(line)
57
- m = _FINAL_RE.search(line)
58
- if not m:
59
- return None
60
- return _clean_line(m.group(1)) or None
 
61
 
62
- def _format_reasoning_chunk(text: str, tag: str, idx: int) -> str:
63
- """
64
- Lightweight formatter for reasoning stream from stdout.
65
- - scrubs <think>…</think>
66
- - strips ANSI
67
- - drops banners/box drawing and 'System prompt …'
68
- - drops very-long low-signal lines
69
- """
70
- stripped = _clean_line(text)
71
- if not stripped:
72
- return ""
73
- if any(stripped.startswith(p) for p in _NOISY_PREFIXES):
74
- return ""
75
- if all(ch in " ─━╭╮╰╯│═·—-_=+•" for ch in stripped):
76
- return ""
77
- if len(stripped) > 240 and not _MIN_SIG_CHARS.search(stripped):
78
- return ""
79
- return f"{stripped}\n\n"
80
 
81
- def _serialize_step(step: t.Any) -> str:
82
  """
83
- Compact, uniform serializer for 'step' objects from different agent libs.
84
- Produces:
85
- Step N
86
- 🧠 Thought: …
87
- 🛠️ Tool: …
88
- 📥 Args: …
89
- 📤 Observation: …
90
- 💥 Error: …
91
- (plus code fences when code is present)
92
- With truncation to keep the reveal parsimonious.
93
  """
94
- parts: list[str] = []
95
-
96
- # Step number (best-effort)
97
  sn = getattr(step, "step_number", None)
98
  if sn is not None:
99
  parts.append(f"Step {sn}")
100
-
101
- # Thought
102
  thought_val = getattr(step, "thought", None)
103
  if thought_val:
104
- parts.append(f"🧠 Thought: {_truncate(_strip_ansi_and_think(str(thought_val)), 600)}")
105
-
106
- # Tool
107
  tool_val = getattr(step, "tool", None)
108
  if tool_val:
109
- parts.append(f"🛠️ Tool: {_truncate(_strip_ansi_and_think(str(tool_val)), 240)}")
110
-
111
- # Code (if any)
112
  code_val = getattr(step, "code", None)
113
  if code_val:
114
- code_str = _truncate(_strip_ansi_and_think(str(code_val)), 1600)
115
- if code_str:
116
- parts.append("```python\n" + code_str + "\n```")
117
-
118
- # Args
119
  args = getattr(step, "args", None)
120
  if args:
121
  try:
122
- arg_s = _truncate(_strip_ansi_and_think(json.dumps(args, ensure_ascii=False)), 800)
 
 
123
  except Exception:
124
- arg_s = _truncate(_strip_ansi_and_think(str(args)), 800)
125
- parts.append("📥 Args: " + arg_s)
126
-
127
- # Error
128
  error = getattr(step, "error", None)
129
  if error:
130
- parts.append(f"💥 Error: {_truncate(_strip_ansi_and_think(str(error)), 600)}")
131
-
132
- # Observations
133
  obs = getattr(step, "observations", None)
134
  if obs is not None:
135
  if isinstance(obs, (list, tuple)):
136
  obs_str = "\n".join(map(str, obs))
137
  else:
138
  obs_str = str(obs)
139
- parts.append("📤 Observation:\n" + _truncate(_strip_ansi_and_think(obs_str), 1600))
140
-
141
- # Final answer via explicit action type patterns (best-effort)
142
- tname = getattr(step, "type_name", "") or getattr(step, "type", "") or ""
143
- if isinstance(tname, str) and tname.lower().startswith("finalanswer"):
 
 
144
  out = getattr(step, "output", None)
145
  if out is not None:
146
- return f"Final answer: {_strip_ansi_and_think(str(out))}"
147
-
148
- # Fallback: parse repr
149
- s = _strip_ansi_and_think(str(step))
150
- m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
151
- if m:
152
- return f"Final answer: {m.group(1).strip()}"
153
-
154
  joined = "\n".join(parts).strip()
155
  if re.match(r"^FinalAnswer[^\n]+\)$", joined):
156
  return ""
157
- return joined or s
158
-
159
- # ---- Tee for redirecting stdout/stderr into an asyncio.Queue ----------------
160
- class QueueWriter(io.TextIOBase):
161
- """
162
- Minimal text writer that sends lines into an asyncio.Queue.
163
- Each non-empty line is enqueued as {"__stdout__": "<line>"}.
164
- """
165
- def __init__(self, q: "asyncio.Queue[dict]"):
166
- self._q = q
167
- self._buf = []
168
-
169
- def write(self, s: str) -> int:
170
- if not isinstance(s, str):
171
- s = str(s)
172
- # Buffer until newline; then emit line events
173
- self._buf.append(s)
174
- text = "".join(self._buf)
175
- if "\n" in text:
176
- lines = text.splitlines(keepends=True)
177
- # keep last partial (no newline) in buffer
178
- tail = "" if text.endswith("\n") else lines.pop()
179
- for ln in lines:
180
- clean = _clean_line(ln)
181
- if clean:
182
- try:
183
- # downstream streamer will call _format_reasoning_chunk & co.
184
- self._q.put_nowait({"__stdout__": clean})
185
- except Exception:
186
- pass
187
- self._buf = [tail]
188
- return len(s)
189
-
190
- def flush(self) -> None:
191
- if self._buf:
192
- text = "".join(self._buf)
193
- self._buf.clear()
194
- clean = _clean_line(text)
195
- if clean:
196
- try:
197
- self._q.put_nowait({"__stdout__": clean})
198
- except Exception:
199
- pass
200
-
201
- def isatty(self) -> bool: # for libraries that test it
202
- return False
203
-
204
- # import asyncio
205
- # import io
206
- # import json
207
- # import re
208
- # import threading
209
- #
210
- # from agent_server.sanitizing_think_tags import scrub_think_tags
211
- #
212
- #
213
- # class QueueWriter(io.TextIOBase):
214
- # """
215
- # File-like object that pushes each write to an asyncio.Queue immediately.
216
- # """
217
- #
218
- # def __init__(self, q: "asyncio.Queue"):
219
- # self.q = q
220
- # self._lock = threading.Lock()
221
- # self._buf = [] # accumulate until newline to reduce spam
222
- #
223
- # def write(self, s: str):
224
- # if not s:
225
- # return 0
226
- # with self._lock:
227
- # self._buf.append(s)
228
- # # flush on newline to keep granularity reasonable
229
- # if "\n" in s:
230
- # chunk = "".join(self._buf)
231
- # self._buf.clear()
232
- # try:
233
- # self.q.put_nowait({"__stdout__": chunk})
234
- # except Exception:
235
- # pass
236
- # return len(s)
237
- #
238
- # def flush(self):
239
- # with self._lock:
240
- # if self._buf:
241
- # chunk = "".join(self._buf)
242
- # self._buf.clear()
243
- # try:
244
- # self.q.put_nowait({"__stdout__": chunk})
245
- # except Exception:
246
- # pass
247
- #
248
- #
249
- # def _serialize_step(step) -> str:
250
- # """
251
- # Best-effort pretty string for a smolagents MemoryStep / ActionStep.
252
- # Works even if attributes are missing on some versions.
253
- # """
254
- # parts = []
255
- # sn = getattr(step, "step_number", None)
256
- # if sn is not None:
257
- # parts.append(f"Step {sn}")
258
- # thought_val = getattr(step, "thought", None)
259
- # if thought_val:
260
- # parts.append(f"Thought: {scrub_think_tags(str(thought_val))}")
261
- # tool_val = getattr(step, "tool", None)
262
- # if tool_val:
263
- # parts.append(f"Tool: {scrub_think_tags(str(tool_val))}")
264
- # code_val = getattr(step, "code", None)
265
- # if code_val:
266
- # code_str = scrub_think_tags(str(code_val)).strip()
267
- # parts.append("```python\n" + code_str + "\n```")
268
- # args = getattr(step, "args", None)
269
- # if args:
270
- # try:
271
- # parts.append(
272
- # "Args: " + scrub_think_tags(json.dumps(args, ensure_ascii=False))
273
- # )
274
- # except Exception:
275
- # parts.append("Args: " + scrub_think_tags(str(args)))
276
- # error = getattr(step, "error", None)
277
- # if error:
278
- # parts.append(f"Error: {scrub_think_tags(str(error))}")
279
- # obs = getattr(step, "observations", None)
280
- # if obs is not None:
281
- # if isinstance(obs, (list, tuple)):
282
- # obs_str = "\n".join(map(str, obs))
283
- # else:
284
- # obs_str = str(obs)
285
- # parts.append("Observation:\n" + scrub_think_tags(obs_str).strip())
286
- # # If this looks like a FinalAnswer step object, surface a clean final answer
287
- # try:
288
- # tname = type(step).__name__
289
- # except Exception:
290
- # tname = ""
291
- # if tname.lower().startswith("finalanswer"):
292
- # out = getattr(step, "output", None)
293
- # if out is not None:
294
- # return f"Final answer: {scrub_think_tags(str(out)).strip()}"
295
- # # Fallback: try to parse from string repr "FinalAnswerStep(output=...)"
296
- # s = scrub_think_tags(str(step))
297
- # m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
298
- # if m:
299
- # return f"Final answer: {m.group(1).strip()}"
300
- # # If the only content would be an object repr like FinalAnswerStep(...), drop it;
301
- # # a cleaner "Final answer: ..." will come from the rule above or stdout.
302
- # joined = "\n".join(parts).strip()
303
- # if re.match(r"^FinalAnswer[^\n]+\)$", joined):
304
- # return ""
305
- # return joined or scrub_think_tags(str(step))
 
 
 
 
1
  import asyncio
2
  import io
3
  import json
4
  import re
5
  import threading
 
 
 
 
 
 
 
 
 
 
 
 
 
6
 
7
+ from agent_server.sanitizing_think_tags import scrub_think_tags
 
 
 
 
 
 
 
 
8
 
 
 
9
 
10
+ class QueueWriter(io.TextIOBase):
11
+ """
12
+ File-like object that pushes each write to an asyncio.Queue immediately.
13
+ """
 
 
 
 
 
 
 
 
 
14
 
15
+ def __init__(self, q: "asyncio.Queue"):
16
+ self.q = q
17
+ self._lock = threading.Lock()
18
+ self._buf = [] # accumulate until newline to reduce spam
19
+
20
+ def write(self, s: str):
21
+ if not s:
22
+ return 0
23
+ with self._lock:
24
+ self._buf.append(s)
25
+ # flush on newline to keep granularity reasonable
26
+ if "\n" in s:
27
+ chunk = "".join(self._buf)
28
+ self._buf.clear()
29
+ try:
30
+ self.q.put_nowait({"__stdout__": chunk})
31
+ except Exception:
32
+ pass
33
+ return len(s)
34
 
35
+ def flush(self):
36
+ with self._lock:
37
+ if self._buf:
38
+ chunk = "".join(self._buf)
39
+ self._buf.clear()
40
+ try:
41
+ self.q.put_nowait({"__stdout__": chunk})
42
+ except Exception:
43
+ pass
44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
 
46
+ def _serialize_step(step) -> str:
47
  """
48
+ Best-effort pretty string for a smolagents MemoryStep / ActionStep.
49
+ Works even if attributes are missing on some versions.
 
 
 
 
 
 
 
 
50
  """
51
+ parts = []
 
 
52
  sn = getattr(step, "step_number", None)
53
  if sn is not None:
54
  parts.append(f"Step {sn}")
 
 
55
  thought_val = getattr(step, "thought", None)
56
  if thought_val:
57
+ parts.append(f"Thought: {scrub_think_tags(str(thought_val))}")
 
 
58
  tool_val = getattr(step, "tool", None)
59
  if tool_val:
60
+ parts.append(f"Tool: {scrub_think_tags(str(tool_val))}")
 
 
61
  code_val = getattr(step, "code", None)
62
  if code_val:
63
+ code_str = scrub_think_tags(str(code_val)).strip()
64
+ parts.append("```python\n" + code_str + "\n```")
 
 
 
65
  args = getattr(step, "args", None)
66
  if args:
67
  try:
68
+ parts.append(
69
+ "Args: " + scrub_think_tags(json.dumps(args, ensure_ascii=False))
70
+ )
71
  except Exception:
72
+ parts.append("Args: " + scrub_think_tags(str(args)))
 
 
 
73
  error = getattr(step, "error", None)
74
  if error:
75
+ parts.append(f"Error: {scrub_think_tags(str(error))}")
 
 
76
  obs = getattr(step, "observations", None)
77
  if obs is not None:
78
  if isinstance(obs, (list, tuple)):
79
  obs_str = "\n".join(map(str, obs))
80
  else:
81
  obs_str = str(obs)
82
+ parts.append("Observation:\n" + scrub_think_tags(obs_str).strip())
83
+ # If this looks like a FinalAnswer step object, surface a clean final answer
84
+ try:
85
+ tname = type(step).__name__
86
+ except Exception:
87
+ tname = ""
88
+ if tname.lower().startswith("finalanswer"):
89
  out = getattr(step, "output", None)
90
  if out is not None:
91
+ return f"Final answer: {scrub_think_tags(str(out)).strip()}"
92
+ # Fallback: try to parse from string repr "FinalAnswerStep(output=...)"
93
+ s = scrub_think_tags(str(step))
94
+ m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
95
+ if m:
96
+ return f"Final answer: {m.group(1).strip()}"
97
+ # If the only content would be an object repr like FinalAnswerStep(...), drop it;
98
+ # a cleaner "Final answer: ..." will come from the rule above or stdout.
99
  joined = "\n".join(parts).strip()
100
  if re.match(r"^FinalAnswer[^\n]+\)$", joined):
101
  return ""
102
+ return joined or scrub_think_tags(str(step))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
agents/generator_and_critic.py CHANGED
@@ -7,66 +7,22 @@ from smolagents import CodeAgent, ToolCallingAgent
7
 
8
  # ---------------- Agent Prompts ----------------
9
  GENERATOR_INSTRUCTIONS = """
10
- You are the Generator/Refiner.
11
-
12
- Goal
13
- - Produce a concise draft that strictly satisfies the caller's constraints.
14
- - Use the managed agent named "critic_agent" to validate your draft.
15
- - Repeat: draft → call critic_agent → if ok, return draft; else revise and re-check.
16
- - Output ONLY the final draft text (no JSON, no commentary).
17
-
18
- Constraints to enforce in every revision:
19
- - Respect the maximum word count.
20
- - Use bullet points where appropriate (lines starting with '-' or '•').
21
- - Include all required phrases verbatim.
22
- - End with a line starting with 'Next steps:'.
23
-
24
- Implementation guidance (you write/run the code):
25
- - Use a small loop with a bounded number of rounds provided by the caller.
26
- - Call critic_agent with a single string payload that contains:
27
- - the DRAFT
28
- - the list of required phrases
29
- - the word limit
30
- - any other constraints you need
31
- - critic_agent returns a JSON object with keys: ok (bool), violations (list[str]), suggestions (str).
32
- - If ok == true, immediately return the current draft (and stop).
33
- - Otherwise, revise the draft to fix ALL violations, then re-check.
34
-
35
- Important:
36
- - Do NOT print anything except the final draft at the end.
37
- - Avoid verbose interleaved logs; keep code minimal.
38
  """
39
 
40
  CRITIC_INSTRUCTIONS = """
41
- You are the Critic.
42
-
43
- Input
44
- - A single string payload that includes:
45
- - DRAFT text
46
- - explicit constraints (e.g., max_words, must_include list, bullets rule, ending 'Next steps:')
47
-
48
- Task
49
- - Evaluate the DRAFT against the constraints and general quality (clarity, correctness, structure, tone).
50
- - Return ONLY compact JSON (no text outside JSON) with shape:
51
- {
52
- "ok": true|false,
53
- "violations": ["short, concrete issues..."],
54
- "suggestions": "one short paragraph of actionable guidance"
55
- }
56
-
57
- Rules
58
- - ok=true ONLY if all constraints are satisfied AND the draft reads clearly.
59
- - Keep violations terse and actionable.
60
- - Keep suggestions short and prescriptive.
61
  """
62
 
63
-
64
  # ---------------- Factory ----------------
65
  def generate_generator_with_managed_critic(
66
  *,
67
  gen_max_steps: int = 12,
68
- crt_max_steps: int = 2,
69
- ) -> CodeAgent:
70
  """
71
  Returns a CodeAgent (generator) that manages a critic sub-agent.
72
  The critic is exposed to the generator as a managed agent (callable like a tool).
@@ -82,13 +38,13 @@ def generate_generator_with_managed_critic(
82
  tools=[], # critic needs no tools; it just returns JSON text
83
  model=model,
84
  name="critic_agent",
85
- description="Evaluates drafts against constraints and returns a compact JSON report.",
86
  instructions=CRITIC_INSTRUCTIONS,
87
  add_base_tools=False,
88
  max_steps=crt_max_steps,
89
  )
90
 
91
- generator = CodeAgent(
92
  tools=[], # keep toolbox minimal
93
  model=model,
94
  name="generator_with_managed_critic",
 
7
 
8
  # ---------------- Agent Prompts ----------------
9
  GENERATOR_INSTRUCTIONS = """
10
+ You are the Generator. Your goal is to produce a concise draft that strictly satisfies the caller's constraints.
11
+ Use the managed agent named "critic_agent" to iteratively improve your draft.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  """
13
 
14
  CRITIC_INSTRUCTIONS = """
15
+ You are the Critic. Your job is to provide constructive, actionable feedback on drafts produced by the Generator.
16
+ You should not iterate or make multiple tool calls.
17
+ Instead, simply call the final answer tool with your evaluation and feedback.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
  """
19
 
 
20
  # ---------------- Factory ----------------
21
  def generate_generator_with_managed_critic(
22
  *,
23
  gen_max_steps: int = 12,
24
+ crt_max_steps: int = 1,
25
+ ) -> ToolCallingAgent:
26
  """
27
  Returns a CodeAgent (generator) that manages a critic sub-agent.
28
  The critic is exposed to the generator as a managed agent (callable like a tool).
 
38
  tools=[], # critic needs no tools; it just returns JSON text
39
  model=model,
40
  name="critic_agent",
41
+ description="Evaluates drafts against constraints and returns a compact set of recommendations report.",
42
  instructions=CRITIC_INSTRUCTIONS,
43
  add_base_tools=False,
44
  max_steps=crt_max_steps,
45
  )
46
 
47
+ generator = ToolCallingAgent(
48
  tools=[], # keep toolbox minimal
49
  model=model,
50
  name="generator_with_managed_critic",