ccm's picture
Reverting logging changes
7e34bee
import asyncio
import io
import json
import re
import threading
from agent_server.sanitizing_think_tags import scrub_think_tags
class QueueWriter(io.TextIOBase):
"""
File-like object that pushes each write to an asyncio.Queue immediately.
"""
def __init__(self, q: "asyncio.Queue"):
self.q = q
self._lock = threading.Lock()
self._buf = [] # accumulate until newline to reduce spam
def write(self, s: str):
if not s:
return 0
with self._lock:
self._buf.append(s)
# flush on newline to keep granularity reasonable
if "\n" in s:
chunk = "".join(self._buf)
self._buf.clear()
try:
self.q.put_nowait({"__stdout__": chunk})
except Exception:
pass
return len(s)
def flush(self):
with self._lock:
if self._buf:
chunk = "".join(self._buf)
self._buf.clear()
try:
self.q.put_nowait({"__stdout__": chunk})
except Exception:
pass
def _serialize_step(step) -> str:
"""
Best-effort pretty string for a smolagents MemoryStep / ActionStep.
Works even if attributes are missing on some versions.
"""
parts = []
sn = getattr(step, "step_number", None)
if sn is not None:
parts.append(f"Step {sn}")
thought_val = getattr(step, "thought", None)
if thought_val:
parts.append(f"Thought: {scrub_think_tags(str(thought_val))}")
tool_val = getattr(step, "tool", None)
if tool_val:
parts.append(f"Tool: {scrub_think_tags(str(tool_val))}")
code_val = getattr(step, "code", None)
if code_val:
code_str = scrub_think_tags(str(code_val)).strip()
parts.append("```python\n" + code_str + "\n```")
args = getattr(step, "args", None)
if args:
try:
parts.append(
"Args: " + scrub_think_tags(json.dumps(args, ensure_ascii=False))
)
except Exception:
parts.append("Args: " + scrub_think_tags(str(args)))
error = getattr(step, "error", None)
if error:
parts.append(f"Error: {scrub_think_tags(str(error))}")
obs = getattr(step, "observations", None)
if obs is not None:
if isinstance(obs, (list, tuple)):
obs_str = "\n".join(map(str, obs))
else:
obs_str = str(obs)
parts.append("Observation:\n" + scrub_think_tags(obs_str).strip())
# If this looks like a FinalAnswer step object, surface a clean final answer
try:
tname = type(step).__name__
except Exception:
tname = ""
if tname.lower().startswith("finalanswer"):
out = getattr(step, "output", None)
if out is not None:
return f"Final answer: {scrub_think_tags(str(out)).strip()}"
# Fallback: try to parse from string repr "FinalAnswerStep(output=...)"
s = scrub_think_tags(str(step))
m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
if m:
return f"Final answer: {m.group(1).strip()}"
# If the only content would be an object repr like FinalAnswerStep(...), drop it;
# a cleaner "Final answer: ..." will come from the rule above or stdout.
joined = "\n".join(parts).strip()
if re.match(r"^FinalAnswer[^\n]+\)$", joined):
return ""
return joined or scrub_think_tags(str(step))