File size: 3,343 Bytes
0b902e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import json
import re
import typing

from agent_server.sanitizing_think_tags import scrub_think_tags


def _format_reasoning_chunk(text: str, tag: str, idx: int) -> str:
    """
    Lightweight formatter for reasoning stream. Avoid huge code fences;
    make it readable and incremental. Also filters out ASCII/box-drawing noise.
    """
    text = scrub_think_tags(text).rstrip("\n")
    if not text:
        return ""
    noisy_prefixes = (
        "OpenAIServerModel",
        "Output message of the LLM",
        "─ Executing parsed code",
        "New run",
        "╭",
        "╰",
        "│",
        "━",
        "─",
    )
    stripped = text.strip()
    if not stripped:
        return ""
    # Lines made mostly of box drawing/separators
    if all(ch in " ─━╭╮╰╯│═·—-_=+•" for ch in stripped):
        return ""
    if any(stripped.startswith(p) for p in noisy_prefixes):
        return ""
    # Excessively long lines with little signal (no alphanumerics)
    if len(stripped) > 240 and not re.search(r"[A-Za-z0-9]{3,}", stripped):
        return ""
    # No tag/idx prefix; add a trailing blank line for readability in markdown
    return f"{stripped}\n\n"


def _extract_final_text(item: typing.Any) -> typing.Optional[str]:
    if isinstance(item, dict) and ("__stdout__" in item or "__step__" in item):
        return None
    if isinstance(item, (bytes, bytearray)):
        try:
            item = item.decode("utf-8", errors="ignore")
        except Exception:
            item = str(item)
    if isinstance(item, str):
        s = scrub_think_tags(item.strip())
        return s or None
    # If it's a step-like object with an 'output' attribute, use that
    try:
        if not isinstance(item, (dict, list, bytes, bytearray)):
            out = getattr(item, "output", None)
            if out is not None:
                s = scrub_think_tags(str(out)).strip()
                if s:
                    return s
    except Exception:
        pass
    if isinstance(item, dict):
        for key in ("content", "text", "message", "output", "final", "answer"):
            if key in item:
                val = item[key]
                if isinstance(val, (dict, list)):
                    try:
                        return scrub_think_tags(json.dumps(val, ensure_ascii=False))
                    except Exception:
                        return scrub_think_tags(str(val))
                if isinstance(val, (bytes, bytearray)):
                    try:
                        val = val.decode("utf-8", errors="ignore")
                    except Exception:
                        val = str(val)
                s = scrub_think_tags(str(val).strip())
                return s or None
        try:
            return scrub_think_tags(json.dumps(item, ensure_ascii=False))
        except Exception:
            return scrub_think_tags(str(item))
    try:
        return scrub_think_tags(str(item))
    except Exception:
        return None


_FINAL_RE = re.compile(r"(?:^|\\b)Final\\s+answer:\\s*(.+)$", flags=re.IGNORECASE)


def _maybe_parse_final_from_stdout(line: str) -> typing.Optional[str]:
    if not isinstance(line, str):
        return None
    m = _FINAL_RE.search(line.strip())
    if not m:
        return None
    return scrub_think_tags(m.group(1)).strip() or None