import asyncio import json import os import typing from agent_server.agent_streaming import run_agent_stream from agent_server.formatting_reasoning import ( _extract_final_text, _maybe_parse_final_from_stdout, _format_reasoning_chunk, ) from agent_server.helpers import normalize_content_to_text, now_ts from agent_server.openai_schemas import ChatCompletionRequest, ChatMessage from agent_server.sanitizing_think_tags import scrub_think_tags from agents.code_writing_agents import ( generate_code_writing_agent_without_tools, generate_code_writing_agent_with_search, ) from agents.generator_and_critic import generate_generator_with_managed_critic from agents.json_tool_calling_agents import ( generate_tool_calling_agent_with_search_and_code, ) from agents.agent_with_custom_beam_design_tools import generate_beam_agent from agents.manager_with_heterogeneous_agents import ( generate_manager_with_heterogeneous_agents, ) # Model name from env MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen3-1.7B") def normalize_model_name(raw_model: typing.Union[str, dict, None]) -> str: """ Accepts either a bare model string or {"id": "..."} form; default to the local code-writing agent if unspecified. """ if isinstance(raw_model, dict): return typing.cast(str, raw_model.get("id", "code-writing-agent-without-tools")) if isinstance(raw_model, str) and raw_model.strip(): return raw_model return "code-writing-agent-without-tools" def is_upstream_passthrough(model_name: str) -> bool: return model_name == MODEL_NAME def is_upstream_passthrough_nothink(model_name: str) -> bool: return model_name == f"{MODEL_NAME}-nothink" def apply_nothink_to_body( body: ChatCompletionRequest, messages: typing.List[ChatMessage] ) -> ChatCompletionRequest: """ Mutates message content to request 'no-think' behavior upstream. - Sets body["model"] to AGENT_MODEL (strip -nothink) - Appends '/nothink' to user message content """ new_body: ChatCompletionRequest = dict(body) # shallow copy is fine new_body["model"] = MODEL_NAME new_messages: typing.List[ChatMessage] = [] for msg in messages: if msg.get("role") == "user": content = normalize_content_to_text(msg.get("content", "")) new_messages.append({"role": "user", "content": content + "\n/nothink"}) else: new_messages.append(msg) new_body["messages"] = new_messages return new_body def agent_for_model(model_name: str): """ Returns an instantiated agent for the given local model id. Raises ValueError on unknown local ids. """ if model_name == "code-writing-agent-without-tools": return generate_code_writing_agent_without_tools() if model_name == "code-writing-agent-with-search": return generate_code_writing_agent_with_search() if model_name == "tool-calling-agent-with-search-and-code": return generate_tool_calling_agent_with_search_and_code() if model_name == "generator-with-managed-critic": return generate_generator_with_managed_critic() if model_name == "custom-agent-with-beam-design-tools": return generate_beam_agent() if model_name == "manager-with-heterogeneous-agents": return generate_manager_with_heterogeneous_agents() raise ValueError(f"Unknown model id: {model_name}") def _openai_stream_base(model_name: str) -> dict: """ The base chunk used for all SSE deltas in streaming mode. """ return { "id": f"chatcmpl-smol-{now_ts()}", "object": "chat.completion.chunk", "created": now_ts(), "model": model_name, "choices": [ { "index": 0, "delta": {"role": "assistant"}, "finish_reason": None, } ], } def _safe_extract_candidate(val: typing.Any) -> typing.Optional[str]: """ Extracts a candidate final text string if present and non-empty. """ cand = _extract_final_text(val) if cand and cand.strip().lower() != "none": return cand return None def _truncate_reasoning_blob(reasoning: str, limit: int = 24000) -> str: if len(reasoning) > limit: return reasoning[:limit] + "\n… [truncated]" return reasoning def make_sse_generator( task: str, agent_for_request: typing.Any, model_name: str, ): """ Returns an async generator that yields SSE 'data:' lines for FastAPI StreamingResponse. """ async def _gen(): base = _openai_stream_base(model_name) # initial role header yield f"data: {json.dumps(base)}\n\n" reasoning_idx = 0 final_candidate: typing.Optional[str] = None async for item in run_agent_stream(task, agent_for_request): # Short-circuit on explicit error signaled by the runner if isinstance(item, dict) and "__error__" in item: error_chunk = { **base, "choices": [{"index": 0, "delta": {}, "finish_reason": "error"}], } yield f"data: {json.dumps(error_chunk)}\n\n" yield f"data: {json.dumps({'error': item['__error__']})}\n\n" break # Explicit final (do not emit yet; keep last candidate) if isinstance(item, dict) and "__final__" in item: cand = _safe_extract_candidate(item["__final__"]) if cand: final_candidate = cand continue # Live stdout -> reasoning_content if ( isinstance(item, dict) and "__stdout__" in item and isinstance(item["__stdout__"], str) ): for line in item["__stdout__"].splitlines(): parsed = _maybe_parse_final_from_stdout(line) if parsed: final_candidate = parsed rt = _format_reasoning_chunk( line, "stdout", reasoning_idx := reasoning_idx + 1 ) if rt: r_chunk = { **base, "choices": [ {"index": 0, "delta": {"reasoning_content": rt}} ], } yield f"data: {json.dumps(r_chunk, ensure_ascii=False)}\n\n" continue # Observed step -> reasoning_content if ( isinstance(item, dict) and "__step__" in item and isinstance(item["__step__"], str) ): for line in item["__step__"].splitlines(): parsed = _maybe_parse_final_from_stdout(line) if parsed: final_candidate = parsed rt = _format_reasoning_chunk( line, "step", reasoning_idx := reasoning_idx + 1 ) if rt: r_chunk = { **base, "choices": [ {"index": 0, "delta": {"reasoning_content": rt}} ], } yield f"data: {json.dumps(r_chunk, ensure_ascii=False)}\n\n" continue # Any other iterable/text from agent -> candidate answer cand = _safe_extract_candidate(item) if cand: final_candidate = cand # Cooperative scheduling await asyncio.sleep(0) # Emit visible answer once at the end (scrub any stray tags) visible = scrub_think_tags(final_candidate or "") if not visible or visible.strip().lower() == "none": visible = "Done." final_chunk = {**base, "choices": [{"index": 0, "delta": {"content": visible}}]} yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n" stop_chunk = { **base, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], } yield f"data: {json.dumps(stop_chunk)}\n\n" yield "data: [DONE]\n\n" return _gen async def run_non_streaming(task: str, agent_for_request: typing.Any) -> str: """ Runs the agent and returns a single OpenAI-style text (with optional block). """ reasoning_lines: typing.List[str] = [] final_candidate: typing.Optional[str] = None async for item in run_agent_stream(task, agent_for_request): if isinstance(item, dict) and "__error__" in item: raise Exception(item["__error__"]) if isinstance(item, dict) and "__final__" in item: cand = _safe_extract_candidate(item["__final__"]) if cand: final_candidate = cand continue if isinstance(item, dict) and "__stdout__" in item: lines = scrub_think_tags(item["__stdout__"]).rstrip("\n").splitlines() for line in lines: parsed = _maybe_parse_final_from_stdout(line) if parsed: final_candidate = parsed rt = _format_reasoning_chunk(line, "stdout", len(reasoning_lines) + 1) if rt: reasoning_lines.append(rt) continue if isinstance(item, dict) and "__step__" in item: lines = scrub_think_tags(item["__step__"]).rstrip("\n").splitlines() for line in lines: parsed = _maybe_parse_final_from_stdout(line) if parsed: final_candidate = parsed rt = _format_reasoning_chunk(line, "step", len(reasoning_lines) + 1) if rt: reasoning_lines.append(rt) continue cand = _safe_extract_candidate(item) if cand: final_candidate = cand reasoning_blob = _truncate_reasoning_blob("\n".join(reasoning_lines).strip()) think_block = f"\n{reasoning_blob}\n\n" if reasoning_blob else "" final_text = scrub_think_tags(final_candidate or "") if not final_text or final_text.strip().lower() == "none": final_text = "Done." return f"{think_block}{final_text}"