Spaces:
Running
Running
File size: 4,121 Bytes
ce1ab17 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
//
// SPDX-FileCopyrightText: Hadad <[email protected]>
// SPDX-License-Identifier: Apache-2.0
//
import { WebSocketServer } from "ws";
import fetch from "node-fetch";
import {
OPENAI_API_BASE_URL,
OPENAI_API_KEY
} from "./config.js";
export default function attachWss(server) {
const wss = new WebSocketServer({ server });
// Handle WebSocket connections.
wss.on("connection", (ws) => {
let currentAbortController = null;
// Send messages to client.
const sendToClient = (type, payload) => {
ws.send(JSON.stringify({ type, ...payload }));
};
// Send logs to client.
const sendError = (message) => {
sendToClient("error", { error: message });
};
// Make a request.
const streamRequest = async (messages, retries = 3) => {
for (let attempt = 1; attempt <= retries; attempt++) {
currentAbortController = new AbortController();
const signal = currentAbortController.signal;
try {
const response = await fetch(OPENAI_API_BASE_URL, {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: "gpt-4.1-nano",
messages,
stream: true,
private: true,
isPrivate: true
}),
signal
});
if (response.status === 502) {
if (attempt === retries) {
sendError(
"The server is currently busy. Please wait a moment or try again later."
);
return;
}
continue;
}
if (!response.ok) {
const errText = await response.text();
sendError(`HTTP ${response.status}: ${response.statusText} - ${errText}`);
return;
}
if (!response.body) {
sendError("Response body is empty.");
return;
}
let buffer = "";
for await (const chunk of response.body) {
if (signal.aborted) {
sendToClient("end", {});
return;
}
buffer += chunk.toString();
let idx;
while ((idx = buffer.indexOf("\n")) !== -1) {
const line = buffer.slice(0, idx).trim();
buffer = buffer.slice(idx + 1);
if (line.startsWith("data: ")) {
const dataStr = line.substring(6).trim();
if (!dataStr || dataStr === "[DONE]") continue;
try {
const parsed = JSON.parse(dataStr);
const part = parsed?.choices?.[0]?.delta?.content;
if (part) sendToClient("chunk", { chunk: part });
} catch (err) {
sendError(`Parse error: ${err.message}`);
}
}
}
}
sendToClient("end", {});
return;
} catch (err) {
if (signal.aborted) {
sendToClient("end", {});
return;
}
if (attempt === retries) {
sendError(err.message || "Unknown error.");
}
}
}
};
// Handle messages from client.
ws.on("message", async (msg) => {
try {
const data = JSON.parse(msg.toString());
if (data.type === "stop") {
if (currentAbortController) currentAbortController.abort();
sendToClient("end", {});
return;
}
const message = data.message;
const history = data.history || [];
const setupMessages = [...history, { role: "user", content: message }];
await streamRequest(setupMessages);
} catch (err) {
sendError(err.message || "An unknown error occurred.");
if (currentAbortController) currentAbortController.abort();
}
});
// Abort on WebSocket close.
ws.on("close", () => {
if (currentAbortController) currentAbortController.abort();
});
});
} |