From 48a5fbdea71364aaa7969dede67aa41877599d0f Mon Sep 17 00:00:00 2001 From: Corey Johnson Date: Wed, 11 Mar 2026 10:38:10 -0700 Subject: [PATCH] Refactor proxy and environment handling - Separate /ok health check from proxy logic into dedicated healthCheck function - Fix WebSocket message race condition by buffering messages until upstream connects - Add 5s timeout for WebSocket upstream connection - Create env.ts for centralized env var handling and validation - Move exports to top of files for better discoverability - Remove spawn() parameter and have it read from env.ts - Remove unused validation logic (now handled by env.ts) Co-Authored-By: Claude Haiku 4.5 --- src/binary.ts | 103 ++++++++++++++++--------------------- src/env.ts | 11 ++++ src/proxy.ts | 138 ++++++++++++++++++++++++++++++-------------------- src/server.ts | 24 +++++---- 4 files changed, 150 insertions(+), 126 deletions(-) create mode 100644 src/env.ts diff --git a/src/binary.ts b/src/binary.ts index 87ca805..71d229f 100644 --- a/src/binary.ts +++ b/src/binary.ts @@ -1,6 +1,7 @@ import { join } from 'path' -import { accessSync, chmodSync, constants, mkdirSync } from 'fs' +import { chmodSync, mkdirSync } from 'fs' import type { Subprocess } from 'bun' +import { DATA_DIR, PRODUCTION, SINGLE_USER_AUTO_LOGIN, SYSTEM_APPS_AUTO_REFRESH } from './env' const BIN_DIR = join(import.meta.dir, '..', 'bin') const GO_PORT = 8000 @@ -11,6 +12,48 @@ let proc: Subprocess | undefined export const isHealthy = () => healthy export const isRunning = () => !!proc +export async function spawn() { + const binPath = join(BIN_DIR, getBinaryName()) + + if (!(await Bun.file(binPath).exists())) { + if (!(await downloadBinary(binPath))) return + } + + console.log('Starting tronbyt server...') + + proc = Bun.spawn([binPath], { + env: { + ...process.env, + DATA_DIR, + DB_DSN: join(DATA_DIR, 'tronbyt.db'), + PRODUCTION, + SINGLE_USER_AUTO_LOGIN, + SYSTEM_APPS_AUTO_REFRESH, + }, + stdout: 'inherit', + stderr: 'inherit', + }) + + proc.exited.then((code) => { + console.log(`Tronbyt server exited with code ${code}`) + healthy = false + proc = undefined + }) + + if (await waitForHealthy()) { + healthy = true + console.log('Tronbyt server is healthy') + } else { + console.error('Tronbyt server failed to become healthy') + } +} + +export function shutdown() { + if (!proc) return + console.log('Shutting down tronbyt server...') + proc.kill('SIGTERM') +} + function getBinaryName(): string { const platform = process.platform === 'darwin' ? 'darwin' : 'linux' const arch = process.arch === 'x64' ? 'amd64' : 'arm64' @@ -53,61 +96,3 @@ async function downloadBinary(binPath: string): Promise { } } -function validate(dataDir: string): string | undefined { - if (!dataDir) return 'DATA_DIR env var is not set — toes should provide this automatically' - - const binPath = join(BIN_DIR, getBinaryName()) - try { - accessSync(binPath, constants.X_OK) - } catch { - return `Binary not found or not executable: ${binPath}` - } -} - -export async function spawn(dataDir: string) { - const binPath = join(BIN_DIR, getBinaryName()) - - if (!(await Bun.file(binPath).exists())) { - if (!(await downloadBinary(binPath))) return - } - - const error = validate(dataDir) - if (error) { - console.error(`Setup error: ${error}`) - return - } - - console.log('Starting tronbyt server...') - - proc = Bun.spawn([binPath], { - env: { - ...process.env, - DATA_DIR: dataDir, - DB_DSN: join(dataDir, 'tronbyt.db'), - PRODUCTION: process.env.PRODUCTION ?? 'true', - SINGLE_USER_AUTO_LOGIN: process.env.SINGLE_USER_AUTO_LOGIN ?? 'true', - SYSTEM_APPS_AUTO_REFRESH: process.env.SYSTEM_APPS_AUTO_REFRESH ?? 'true', - }, - stdout: 'inherit', - stderr: 'inherit', - }) - - proc.exited.then((code) => { - console.log(`Tronbyt server exited with code ${code}`) - healthy = false - proc = undefined - }) - - if (await waitForHealthy()) { - healthy = true - console.log('Tronbyt server is healthy') - } else { - console.error('Tronbyt server failed to become healthy') - } -} - -export function shutdown() { - if (!proc) return - console.log('Shutting down tronbyt server...') - proc.kill('SIGTERM') -} diff --git a/src/env.ts b/src/env.ts new file mode 100644 index 0000000..ddeb11a --- /dev/null +++ b/src/env.ts @@ -0,0 +1,11 @@ +function required(name: string): string { + const value = process.env[name] + if (!value) throw new Error(`Missing required env var: ${name}`) + return value +} + +export const DATA_DIR = required('DATA_DIR') +export const PORT = Number(process.env.PORT) || 3000 +export const PRODUCTION = process.env.PRODUCTION ?? 'true' +export const SINGLE_USER_AUTO_LOGIN = process.env.SINGLE_USER_AUTO_LOGIN ?? 'true' +export const SYSTEM_APPS_AUTO_REFRESH = process.env.SYSTEM_APPS_AUTO_REFRESH ?? 'true' diff --git a/src/proxy.ts b/src/proxy.ts index f84a536..bd2a314 100644 --- a/src/proxy.ts +++ b/src/proxy.ts @@ -1,4 +1,5 @@ import type { ServerWebSocket } from 'bun' +import { isHealthy, isRunning } from './binary' export interface WsData { path: string @@ -8,61 +9,86 @@ export interface WsData { const GO_PORT = 8000 const GO_BASE = `http://127.0.0.1:${GO_PORT}` -const upstreams = new Map, WebSocket>() +const WS_CONNECT_TIMEOUT = 5000 -export function createProxy(isHealthy: () => boolean, isRunning: () => boolean) { - async function proxyFetch(req: Request): Promise { - const url = new URL(req.url) - - if (url.pathname === '/ok') { - if (!isHealthy()) return new Response('starting', { status: isRunning() ? 200 : 503 }) - return fetch(`${GO_BASE}/health`) - .then((r) => (r.ok ? new Response('ok') : new Response('unhealthy', { status: 503 }))) - .catch(() => new Response('unhealthy', { status: 503 })) - } - - const hasBody = req.method !== 'GET' && req.method !== 'HEAD' - const body = hasBody ? await req.arrayBuffer() : undefined - - return fetch(`${GO_BASE}${url.pathname}${url.search}`, { - method: req.method, - headers: req.headers, - body, - redirect: 'manual', - }).then((r) => { - // Bun auto-decompresses gzip but leaves content-encoding header. - // Strip it so the next proxy layer doesn't try to decompress again. - const headers = new Headers(r.headers) - headers.delete('content-encoding') - headers.delete('content-length') - return new Response(r.body, { status: r.status, headers }) - }).catch((e) => { - console.error('Proxy error:', e) - return new Response('Tronbyt server is not responding', { status: 502 }) - }) - } - - const websocket = { - open(ws: ServerWebSocket) { - const upstream = new WebSocket(`ws://127.0.0.1:${GO_PORT}${ws.data.path}`, ws.data.protocols) - upstream.binaryType = 'arraybuffer' - upstreams.set(ws, upstream) - - upstream.addEventListener('message', (e) => ws.send(e.data as string | ArrayBuffer)) - upstream.addEventListener('close', () => { upstreams.delete(ws); ws.close() }) - upstream.addEventListener('error', () => { upstreams.delete(ws); ws.close() }) - }, - - message(ws: ServerWebSocket, msg: string | ArrayBuffer | Uint8Array) { - const upstream = upstreams.get(ws) - if (upstream?.readyState === WebSocket.OPEN) upstream.send(msg) - }, - - close(ws: ServerWebSocket) { - upstreams.get(ws)?.close() - upstreams.delete(ws) - }, - } - - return { proxyFetch, websocket } +export function healthCheck(): Promise { + if (!isHealthy()) return Promise.resolve(new Response('starting', { status: isRunning() ? 200 : 503 })) + return fetch(`${GO_BASE}/health`) + .then((r) => (r.ok ? new Response('ok') : new Response('unhealthy', { status: 503 }))) + .catch(() => new Response('unhealthy', { status: 503 })) +} + +export async function proxyFetch(req: Request): Promise { + const url = new URL(req.url) + const hasBody = req.method !== 'GET' && req.method !== 'HEAD' + const body = hasBody ? await req.arrayBuffer() : undefined + + return fetch(`${GO_BASE}${url.pathname}${url.search}`, { + method: req.method, + headers: req.headers, + body, + redirect: 'manual', + }).then((r) => { + // Bun auto-decompresses gzip but leaves content-encoding header. + // Strip it so the next proxy layer doesn't try to decompress again. + const headers = new Headers(r.headers) + headers.delete('content-encoding') + headers.delete('content-length') + return new Response(r.body, { status: r.status, headers }) + }).catch((e) => { + console.error('Proxy error:', e) + return new Response('Tronbyt server is not responding', { status: 502 }) + }) +} + +interface UpstreamState { + socket: WebSocket + pending: (string | ArrayBuffer | Uint8Array)[] +} + +const upstreams = new Map, UpstreamState>() + +function cleanup(ws: ServerWebSocket) { + const state = upstreams.get(ws) + if (!state) return + upstreams.delete(ws) + if (state.socket.readyState <= WebSocket.OPEN) state.socket.close() +} + +export const websocket = { + open(ws: ServerWebSocket) { + const socket = new WebSocket(`ws://127.0.0.1:${GO_PORT}${ws.data.path}`, ws.data.protocols) + socket.binaryType = 'arraybuffer' + const state: UpstreamState = { socket, pending: [] } + upstreams.set(ws, state) + + const timeout = setTimeout(() => { + console.error('WebSocket upstream connection timed out') + cleanup(ws) + ws.close() + }, WS_CONNECT_TIMEOUT) + + socket.addEventListener('open', () => { + clearTimeout(timeout) + for (const msg of state.pending) socket.send(msg) + state.pending.length = 0 + }) + socket.addEventListener('message', (e) => ws.send(e.data as string | ArrayBuffer)) + socket.addEventListener('close', () => { cleanup(ws); ws.close() }) + socket.addEventListener('error', () => { clearTimeout(timeout); cleanup(ws); ws.close() }) + }, + + message(ws: ServerWebSocket, msg: string | ArrayBuffer | Uint8Array) { + const state = upstreams.get(ws) + if (!state) return + if (state.socket.readyState === WebSocket.OPEN) { + state.socket.send(msg) + } else { + state.pending.push(msg) + } + }, + + close(ws: ServerWebSocket) { + cleanup(ws) + }, } diff --git a/src/server.ts b/src/server.ts index b75e4ac..d58592d 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,25 +1,27 @@ -import { createProxy, type WsData } from './proxy' -import { isHealthy, isRunning, shutdown, spawn } from './binary' +import { healthCheck, proxyFetch, websocket, type WsData } from './proxy' +import { shutdown, spawn } from './binary' +import { PORT } from './env' -const DATA_DIR = process.env.DATA_DIR! -const PORT = Number(process.env.PORT) || 3000 - -const { proxyFetch, websocket } = createProxy(isHealthy, isRunning) - -const server = Bun.serve({ +const server = Bun.serve({ port: PORT, hostname: '::', idleTimeout: 255, fetch(req, server) { + const url = new URL(req.url) + + if (url.pathname === '/ok') { + return healthCheck() + } + if (req.headers.get('upgrade')?.toLowerCase() === 'websocket') { - const url = new URL(req.url) const protocolHeader = req.headers.get('sec-websocket-protocol') const protocols = protocolHeader ? protocolHeader.split(',').map((p) => p.trim()) : [] const headers: Record = {} if (protocolHeader) headers['sec-websocket-protocol'] = protocolHeader - if (server.upgrade(req, { data: { path: url.pathname + url.search, protocols }, headers })) return + const upgradeSuccessful = server.upgrade(req, { data: { path: url.pathname + url.search, protocols }, headers }) + if (upgradeSuccessful) return return new Response('WebSocket upgrade failed', { status: 500 }) } @@ -34,4 +36,4 @@ console.log(`Listening on port ${server.port}`) process.on('SIGTERM', shutdown) process.on('SIGINT', shutdown) -spawn(DATA_DIR) +spawn()