|
""" |
|
FastAPI application providing OpenAI-compatible API endpoints using QodoAI. |
|
""" |
|
|
|
import json |
|
import time |
|
import uuid |
|
import logging |
|
import asyncio |
|
from typing import List, Dict, Optional, Union, Generator, Any, AsyncGenerator |
|
from contextlib import asynccontextmanager |
|
|
|
from fastapi import FastAPI, HTTPException, Depends, Request, status |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import StreamingResponse, JSONResponse |
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
|
from pydantic import BaseModel, Field, validator |
|
import uvicorn |
|
|
|
from curl_cffi.requests import Session |
|
from curl_cffi import CurlError |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
security = HTTPBearer(auto_error=False) |
|
|
|
|
|
|
|
|
|
|
|
class FailedToGenerateResponseError(Exception): |
|
"""Exception raised when response generation fails.""" |
|
pass |
|
|
|
|
|
|
|
|
|
|
|
def sanitize_stream(data, intro_value="", to_json=True, skip_markers=None, content_extractor=None, yield_raw_on_error=True, raw=False): |
|
"""Sanitize stream data and extract content.""" |
|
if skip_markers is None: |
|
skip_markers = [] |
|
|
|
for chunk in data: |
|
if chunk: |
|
try: |
|
chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk) |
|
if any(marker in chunk_str for marker in skip_markers): |
|
continue |
|
|
|
if to_json: |
|
try: |
|
json_obj = json.loads(chunk_str) |
|
if content_extractor: |
|
content = content_extractor(json_obj) |
|
if content: |
|
yield content |
|
except json.JSONDecodeError: |
|
if yield_raw_on_error: |
|
yield chunk_str |
|
else: |
|
yield chunk_str |
|
except Exception as e: |
|
if yield_raw_on_error: |
|
yield str(chunk) |
|
|
|
|
|
|
|
|
|
|
|
class ChatMessage(BaseModel): |
|
role: str = Field(..., description="The role of the message author") |
|
content: str = Field(..., description="The content of the message") |
|
name: Optional[str] = Field(None, description="The name of the author") |
|
|
|
class ChatCompletionRequest(BaseModel): |
|
model: str = Field(..., description="ID of the model to use") |
|
messages: List[ChatMessage] = Field(..., description="List of messages comprising the conversation") |
|
max_tokens: Optional[int] = Field(2049, description="Maximum number of tokens to generate") |
|
temperature: Optional[float] = Field(None, ge=0, le=2, description="Sampling temperature") |
|
top_p: Optional[float] = Field(None, ge=0, le=1, description="Nucleus sampling parameter") |
|
stream: Optional[bool] = Field(False, description="Whether to stream back partial progress") |
|
stop: Optional[Union[str, List[str]]] = Field(None, description="Stop sequences") |
|
presence_penalty: Optional[float] = Field(None, ge=-2, le=2, description="Presence penalty") |
|
frequency_penalty: Optional[float] = Field(None, ge=-2, le=2, description="Frequency penalty") |
|
|
|
class Usage(BaseModel): |
|
prompt_tokens: int |
|
completion_tokens: int |
|
total_tokens: int |
|
|
|
class ChatCompletionMessage(BaseModel): |
|
role: str |
|
content: str |
|
|
|
class Choice(BaseModel): |
|
index: int |
|
message: Optional[ChatCompletionMessage] = None |
|
delta: Optional[Dict[str, Any]] = None |
|
finish_reason: Optional[str] = None |
|
|
|
class ChatCompletionResponse(BaseModel): |
|
id: str |
|
object: str = "chat.completion" |
|
created: int |
|
model: str |
|
choices: List[Choice] |
|
usage: Usage |
|
|
|
class ChatCompletionChunk(BaseModel): |
|
id: str |
|
object: str = "chat.completion.chunk" |
|
created: int |
|
model: str |
|
choices: List[Choice] |
|
|
|
class ModelInfo(BaseModel): |
|
id: str |
|
object: str = "model" |
|
created: int |
|
owned_by: str = "qodo" |
|
|
|
class ModelListResponse(BaseModel): |
|
object: str = "list" |
|
data: List[ModelInfo] |
|
|
|
class HealthResponse(BaseModel): |
|
status: str |
|
timestamp: int |
|
|
|
|
|
|
|
|
|
|
|
class QodoAI: |
|
"""OpenAI-compatible client for Qodo AI API.""" |
|
|
|
AVAILABLE_MODELS = [ |
|
"gpt-4.1", |
|
"gpt-4o", |
|
"o3", |
|
"o4-mini", |
|
"claude-4-sonnet", |
|
"gemini-2.5-pro" |
|
] |
|
|
|
def __init__(self, api_key: Optional[str] = None, timeout: int = 30): |
|
self.url = "https://api.cli.qodo.ai/v2/agentic/start-task" |
|
self.info_url = "https://api.cli.qodo.ai/v2/info/get-things" |
|
self.timeout = timeout |
|
self.api_key = api_key |
|
|
|
|
|
self.fingerprint = {"user_agent": "axios/1.10.0", "browser_type": "chrome"} |
|
|
|
|
|
self.session_id = self._get_session_id() |
|
self.request_id = str(uuid.uuid4()) |
|
|
|
|
|
self.headers = { |
|
"Accept": "text/plain", |
|
"Accept-Encoding": "gzip, deflate, br, zstd", |
|
"Accept-Language": "en-US,en;q=0.9", |
|
"Authorization": f"Bearer {self.api_key}", |
|
"Connection": "close", |
|
"Content-Type": "application/json", |
|
"host": "api.cli.qodo.ai", |
|
"Request-id": self.request_id, |
|
"Session-id": self.session_id, |
|
"User-Agent": self.fingerprint["user_agent"], |
|
} |
|
|
|
|
|
self.session = Session() |
|
self.session.headers.update(self.headers) |
|
|
|
def _get_session_id(self) -> str: |
|
"""Get session ID from Qodo API.""" |
|
try: |
|
temp_session = Session() |
|
temp_headers = { |
|
"Accept": "text/plain", |
|
"Authorization": f"Bearer {self.api_key}", |
|
"Content-Type": "application/json", |
|
"User-Agent": "axios/1.10.0", |
|
} |
|
|
|
temp_session.headers.update(temp_headers) |
|
|
|
response = temp_session.get(self.info_url, timeout=self.timeout, impersonate="chrome110") |
|
|
|
if response.status_code == 200: |
|
data = response.json() |
|
session_id = data.get("session-id") |
|
if session_id: |
|
return session_id |
|
|
|
return f"20250630-{str(uuid.uuid4())}" |
|
|
|
except Exception: |
|
return f"20250630-{str(uuid.uuid4())}" |
|
|
|
@staticmethod |
|
def _qodo_extractor(chunk: Union[str, Dict[str, Any]]) -> Optional[str]: |
|
"""Extracts content from Qodo stream JSON objects.""" |
|
if isinstance(chunk, dict): |
|
data = chunk.get("data", {}) |
|
if isinstance(data, dict): |
|
tool_args = data.get("tool_args", {}) |
|
if isinstance(tool_args, dict): |
|
content = tool_args.get("content") |
|
if content: |
|
return content |
|
|
|
if "content" in data: |
|
return data["content"] |
|
|
|
if "choices" in chunk: |
|
choices = chunk["choices"] |
|
if isinstance(choices, list) and len(choices) > 0: |
|
choice = choices[0] |
|
if isinstance(choice, dict): |
|
delta = choice.get("delta", {}) |
|
if isinstance(delta, dict) and "content" in delta: |
|
return delta["content"] |
|
|
|
message = choice.get("message", {}) |
|
if isinstance(message, dict) and "content" in message: |
|
return message["content"] |
|
|
|
elif isinstance(chunk, str): |
|
try: |
|
parsed = json.loads(chunk) |
|
return QodoAI._qodo_extractor(parsed) |
|
except json.JSONDecodeError: |
|
if chunk.strip(): |
|
return chunk.strip() |
|
|
|
return None |
|
|
|
def _build_payload(self, prompt: str, model: str = "claude-4-sonnet"): |
|
"""Build the payload for Qodo AI API.""" |
|
return { |
|
"agent_type": "cli", |
|
"session_id": self.session_id, |
|
"user_data": { |
|
"extension_version": "0.7.2", |
|
"os_platform": "win32", |
|
"os_version": "v23.9.0", |
|
"editor_type": "cli" |
|
}, |
|
"tools": { |
|
"web_search": [ |
|
{ |
|
"name": "web_search", |
|
"description": "Searches the web and returns results based on the user's query (Powered by Nimble).", |
|
"inputSchema": { |
|
"type": "object", |
|
"properties": { |
|
"query": { |
|
"description": "The search query to execute", |
|
"title": "Query", |
|
"type": "string" |
|
} |
|
}, |
|
"required": ["query"] |
|
}, |
|
"be_tool": True, |
|
"autoApproved": True |
|
} |
|
] |
|
}, |
|
"user_request": prompt, |
|
"execution_strategy": "act", |
|
"custom_model": model, |
|
"stream": True |
|
} |
|
|
|
async def create_chat_completion(self, request: ChatCompletionRequest) -> Union[ChatCompletionResponse, AsyncGenerator]: |
|
"""Create a chat completion response.""" |
|
|
|
user_prompt = "" |
|
for message in reversed(request.messages): |
|
if message.role == "user": |
|
user_prompt = message.content |
|
|
|
break |
|
|
|
if not user_prompt: |
|
raise HTTPException(status_code=400, detail="No user message found in messages") |
|
|
|
payload = self._build_payload(user_prompt, request.model) |
|
payload["stream"] = request.stream |
|
|
|
request_id = f"chatcmpl-{uuid.uuid4()}" |
|
created_time = int(time.time()) |
|
|
|
if request.stream: |
|
return self._create_stream_response(request_id, created_time, request.model, payload, user_prompt) |
|
else: |
|
return await self._create_non_stream_response(request_id, created_time, request.model, payload, user_prompt) |
|
|
|
async def _create_stream_response(self, request_id: str, created_time: int, model: str, payload: Dict[str, Any], user_prompt: str): |
|
"""Create streaming response.""" |
|
try: |
|
response = self.session.post( |
|
self.url, |
|
json=payload, |
|
stream=True, |
|
timeout=self.timeout, |
|
impersonate="chrome110" |
|
) |
|
|
|
if response.status_code == 401: |
|
raise HTTPException(status_code=401, detail="Invalid API key") |
|
elif response.status_code != 200: |
|
raise HTTPException(status_code=500, detail=f"Qodo request failed: {response.text}") |
|
|
|
async def generate(): |
|
try: |
|
processed_stream = sanitize_stream( |
|
data=response.iter_content(chunk_size=None), |
|
intro_value="", |
|
to_json=True, |
|
skip_markers=["[DONE]"], |
|
content_extractor=QodoAI._qodo_extractor, |
|
yield_raw_on_error=True, |
|
raw=False |
|
) |
|
|
|
for content_chunk in processed_stream: |
|
if content_chunk: |
|
chunk_data = { |
|
"id": request_id, |
|
"object": "chat.completion.chunk", |
|
"created": created_time, |
|
"model": model, |
|
"choices": [{ |
|
"index": 0, |
|
"delta": {"content": content_chunk, "role": "assistant"}, |
|
"finish_reason": None |
|
}] |
|
} |
|
yield f"data: {json.dumps(chunk_data)}\n\n" |
|
|
|
|
|
final_chunk = { |
|
"id": request_id, |
|
"object": "chat.completion.chunk", |
|
"created": created_time, |
|
"model": model, |
|
"choices": [{ |
|
"index": 0, |
|
"delta": {}, |
|
"finish_reason": "stop" |
|
}] |
|
} |
|
yield f"data: {json.dumps(final_chunk)}\n\n" |
|
yield "data: [DONE]\n\n" |
|
|
|
except Exception as e: |
|
logger.error(f"Streaming error: {e}") |
|
error_chunk = { |
|
"id": request_id, |
|
"object": "chat.completion.chunk", |
|
"created": created_time, |
|
"model": model, |
|
"choices": [{ |
|
"index": 0, |
|
"delta": {}, |
|
"finish_reason": "stop" |
|
}] |
|
} |
|
yield f"data: {json.dumps(error_chunk)}\n\n" |
|
yield "data: [DONE]\n\n" |
|
|
|
return generate() |
|
|
|
except Exception as e: |
|
logger.error(f"Stream creation error: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
async def _create_non_stream_response(self, request_id: str, created_time: int, model: str, payload: Dict[str, Any], user_prompt: str) -> ChatCompletionResponse: |
|
"""Create non-streaming response.""" |
|
try: |
|
payload["stream"] = False |
|
response = self.session.post( |
|
self.url, |
|
json=payload, |
|
timeout=self.timeout, |
|
impersonate="chrome110" |
|
) |
|
|
|
if response.status_code == 401: |
|
raise HTTPException(status_code=401, detail="Invalid API key") |
|
elif response.status_code != 200: |
|
raise HTTPException(status_code=500, detail=f"Qodo request failed: {response.text}") |
|
|
|
response_text = response.text |
|
full_response = "" |
|
|
|
|
|
lines = response_text.replace('}\n{', '}\n{').split('\n') |
|
json_objects = [] |
|
|
|
current_json = "" |
|
brace_count = 0 |
|
|
|
for line in lines: |
|
line = line.strip() |
|
if line: |
|
current_json += line |
|
brace_count += line.count('{') - line.count('}') |
|
|
|
if brace_count == 0 and current_json: |
|
json_objects.append(current_json) |
|
current_json = "" |
|
|
|
if current_json and brace_count == 0: |
|
json_objects.append(current_json) |
|
|
|
for json_str in json_objects: |
|
if json_str.strip(): |
|
try: |
|
json_obj = json.loads(json_str) |
|
content = QodoAI._qodo_extractor(json_obj) |
|
if content: |
|
full_response += content |
|
except json.JSONDecodeError: |
|
pass |
|
|
|
|
|
prompt_tokens = len(user_prompt.split()) |
|
completion_tokens = len(full_response.split()) |
|
total_tokens = prompt_tokens + completion_tokens |
|
|
|
return ChatCompletionResponse( |
|
id=request_id, |
|
created=created_time, |
|
model=model, |
|
choices=[Choice( |
|
index=0, |
|
message=ChatCompletionMessage(role="assistant", content=full_response), |
|
finish_reason="stop" |
|
)], |
|
usage=Usage( |
|
prompt_tokens=prompt_tokens, |
|
completion_tokens=completion_tokens, |
|
total_tokens=total_tokens |
|
) |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"Non-stream response error: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
qodo_client = None |
|
|
|
@asynccontextmanager |
|
async def lifespan(app: FastAPI): |
|
"""Application lifespan manager.""" |
|
global qodo_client |
|
logger.info("Starting FastAPI application...") |
|
qodo_client = QodoAI() |
|
yield |
|
logger.info("Shutting down FastAPI application...") |
|
|
|
|
|
app = FastAPI( |
|
title="QodoAI OpenAI-Compatible API", |
|
description="FastAPI application providing OpenAI-compatible endpoints using QodoAI", |
|
version="1.0.0", |
|
lifespan=lifespan |
|
) |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): |
|
"""Verify API key from Authorization header.""" |
|
if not credentials: |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail="Missing API key", |
|
headers={"WWW-Authenticate": "Bearer"}, |
|
) |
|
|
|
|
|
|
|
if not credentials.credentials: |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail="Invalid API key", |
|
headers={"WWW-Authenticate": "Bearer"}, |
|
) |
|
|
|
return credentials.credentials |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/health", response_model=HealthResponse) |
|
async def health_check(): |
|
"""Health check endpoint.""" |
|
return HealthResponse( |
|
status="healthy", |
|
timestamp=int(time.time()) |
|
) |
|
|
|
@app.get("/v1/models", response_model=ModelListResponse) |
|
async def list_models(api_key: str = Depends(verify_api_key)): |
|
"""List available models.""" |
|
try: |
|
models = [] |
|
for model_id in QodoAI.AVAILABLE_MODELS: |
|
models.append(ModelInfo( |
|
id=model_id, |
|
created=int(time.time()), |
|
owned_by="qodo" |
|
)) |
|
|
|
return ModelListResponse(data=models) |
|
|
|
except Exception as e: |
|
logger.error(f"Error listing models: {e}") |
|
raise HTTPException(status_code=500, detail="Failed to list models") |
|
|
|
@app.post("/v1/chat/completions") |
|
async def create_chat_completion( |
|
request: ChatCompletionRequest, |
|
api_key: str = Depends(verify_api_key) |
|
): |
|
"""Create a chat completion.""" |
|
try: |
|
|
|
if request.model not in QodoAI.AVAILABLE_MODELS: |
|
raise HTTPException( |
|
status_code=400, |
|
detail=f"Model '{request.model}' is not available. Available models: {QodoAI.AVAILABLE_MODELS}" |
|
) |
|
|
|
|
|
result = await qodo_client.create_chat_completion(request) |
|
|
|
if request.stream: |
|
|
|
return StreamingResponse( |
|
result, |
|
media_type="text/plain", |
|
headers={ |
|
"Cache-Control": "no-cache", |
|
"Connection": "keep-alive", |
|
"Content-Type": "text/plain; charset=utf-8" |
|
} |
|
) |
|
else: |
|
|
|
return result |
|
|
|
except HTTPException: |
|
raise |
|
except Exception as e: |
|
logger.error(f"Error creating chat completion: {e}") |
|
raise HTTPException(status_code=500, detail="Failed to create chat completion") |
|
|
|
@app.exception_handler(Exception) |
|
async def global_exception_handler(request: Request, exc: Exception): |
|
"""Global exception handler.""" |
|
logger.error(f"Unhandled exception: {exc}") |
|
return JSONResponse( |
|
status_code=500, |
|
content={"error": {"message": "Internal server error", "type": "internal_error"}} |
|
) |
|
|
|
@app.middleware("http") |
|
async def log_requests(request: Request, call_next): |
|
"""Log all requests.""" |
|
start_time = time.time() |
|
|
|
|
|
logger.info(f"{request.method} {request.url.path} - Start") |
|
|
|
response = await call_next(request) |
|
|
|
|
|
process_time = time.time() - start_time |
|
logger.info(f"{request.method} {request.url.path} - {response.status_code} - {process_time:.3f}s") |
|
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
uvicorn.run( |
|
"main:app", |
|
host="0.0.0.0", |
|
port=8000, |
|
reload=True, |
|
log_level="info" |
|
) |