diff --git a/src/server/index.tsx b/src/server/index.tsx index 0a2e899..072a2ea 100644 --- a/src/server/index.tsx +++ b/src/server/index.tsx @@ -6,6 +6,8 @@ import syncRouter from './api/sync' import systemRouter from './api/system' import { Hype } from '@because/hype' import { cleanupStalePublishers } from './mdns' +import type { Server } from 'bun' +import type { WsData } from './proxy' import { extractSubdomain, proxySubdomain, proxyWebSocket, websocket } from './proxy' const app = new Hype({ layout: false, logging: !!process.env.DEBUG }) @@ -65,7 +67,7 @@ const defaults = app.defaults export default { ...defaults, maxRequestBodySize: 1024 * 1024 * 50, // 50MB - fetch(req: Request, server: any) { + fetch(req: Request, server: Server) { const subdomain = extractSubdomain(req.headers.get('host') ?? '') if (subdomain) { if (req.headers.get('upgrade')?.toLowerCase() === 'websocket') { diff --git a/src/server/proxy.ts b/src/server/proxy.ts index e1fb2b8..3fd8c15 100644 --- a/src/server/proxy.ts +++ b/src/server/proxy.ts @@ -1,6 +1,9 @@ -import type { ServerWebSocket } from 'bun' +import type { Server, ServerWebSocket } from 'bun' import { getApp } from '$apps' +export type { WsData } + +const pendingMessages = new Map, (string | ArrayBuffer | Uint8Array)[]>() const upstreams = new Map, WebSocket>() interface WsData { @@ -38,8 +41,8 @@ export async function proxySubdomain(subdomain: string, req: Request): Promise): Response | undefined { const app = getApp(subdomain) if (!app || app.state !== 'running' || !app.port) { @@ -83,17 +86,39 @@ export const websocket = { upstream.binaryType = 'arraybuffer' upstreams.set(ws, upstream) + pendingMessages.set(ws, []) + + const timeout = setTimeout(() => { + if (upstream.readyState !== WebSocket.OPEN) { + upstream.close() + ws.close() + } + }, 10_000) + + upstream.addEventListener('open', () => { + clearTimeout(timeout) + const buffered = pendingMessages.get(ws) + if (buffered) { + for (const msg of buffered) upstream.send(msg) + pendingMessages.delete(ws) + } + }) upstream.addEventListener('message', e => { + // binaryType is 'arraybuffer' so data is always string | ArrayBuffer ws.send(e.data as string | ArrayBuffer) }) upstream.addEventListener('close', () => { + clearTimeout(timeout) + pendingMessages.delete(ws) upstreams.delete(ws) ws.close() }) upstream.addEventListener('error', () => { + clearTimeout(timeout) + pendingMessages.delete(ws) upstreams.delete(ws) ws.close() }) @@ -101,7 +126,11 @@ export const websocket = { message(ws: ServerWebSocket, msg: string | ArrayBuffer | Uint8Array) { const upstream = upstreams.get(ws) - if (!upstream || upstream.readyState !== WebSocket.OPEN) return + if (!upstream) return + if (upstream.readyState !== WebSocket.OPEN) { + pendingMessages.get(ws)?.push(msg) + return + } upstream.send(msg) }, @@ -111,5 +140,6 @@ export const websocket = { upstream.close() upstreams.delete(ws) } + pendingMessages.delete(ws) }, }