Refactor proxy and environment handling

- Separate /ok health check from proxy logic into dedicated healthCheck function
- Fix WebSocket message race condition by buffering messages until upstream connects
- Add 5s timeout for WebSocket upstream connection
- Create env.ts for centralized env var handling and validation
- Move exports to top of files for better discoverability
- Remove spawn() parameter and have it read from env.ts
- Remove unused validation logic (now handled by env.ts)

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
Corey Johnson 2026-03-11 10:38:10 -07:00
parent 3fc2574529
commit 48a5fbdea7
4 changed files with 150 additions and 126 deletions

View File

@ -1,6 +1,7 @@
import { join } from 'path'
import { accessSync, chmodSync, constants, mkdirSync } from 'fs'
import { chmodSync, mkdirSync } from 'fs'
import type { Subprocess } from 'bun'
import { DATA_DIR, PRODUCTION, SINGLE_USER_AUTO_LOGIN, SYSTEM_APPS_AUTO_REFRESH } from './env'
const BIN_DIR = join(import.meta.dir, '..', 'bin')
const GO_PORT = 8000
@ -11,6 +12,48 @@ let proc: Subprocess | undefined
export const isHealthy = () => healthy
export const isRunning = () => !!proc
export async function spawn() {
const binPath = join(BIN_DIR, getBinaryName())
if (!(await Bun.file(binPath).exists())) {
if (!(await downloadBinary(binPath))) return
}
console.log('Starting tronbyt server...')
proc = Bun.spawn([binPath], {
env: {
...process.env,
DATA_DIR,
DB_DSN: join(DATA_DIR, 'tronbyt.db'),
PRODUCTION,
SINGLE_USER_AUTO_LOGIN,
SYSTEM_APPS_AUTO_REFRESH,
},
stdout: 'inherit',
stderr: 'inherit',
})
proc.exited.then((code) => {
console.log(`Tronbyt server exited with code ${code}`)
healthy = false
proc = undefined
})
if (await waitForHealthy()) {
healthy = true
console.log('Tronbyt server is healthy')
} else {
console.error('Tronbyt server failed to become healthy')
}
}
export function shutdown() {
if (!proc) return
console.log('Shutting down tronbyt server...')
proc.kill('SIGTERM')
}
function getBinaryName(): string {
const platform = process.platform === 'darwin' ? 'darwin' : 'linux'
const arch = process.arch === 'x64' ? 'amd64' : 'arm64'
@ -53,61 +96,3 @@ async function downloadBinary(binPath: string): Promise<boolean> {
}
}
function validate(dataDir: string): string | undefined {
if (!dataDir) return 'DATA_DIR env var is not set — toes should provide this automatically'
const binPath = join(BIN_DIR, getBinaryName())
try {
accessSync(binPath, constants.X_OK)
} catch {
return `Binary not found or not executable: ${binPath}`
}
}
export async function spawn(dataDir: string) {
const binPath = join(BIN_DIR, getBinaryName())
if (!(await Bun.file(binPath).exists())) {
if (!(await downloadBinary(binPath))) return
}
const error = validate(dataDir)
if (error) {
console.error(`Setup error: ${error}`)
return
}
console.log('Starting tronbyt server...')
proc = Bun.spawn([binPath], {
env: {
...process.env,
DATA_DIR: dataDir,
DB_DSN: join(dataDir, 'tronbyt.db'),
PRODUCTION: process.env.PRODUCTION ?? 'true',
SINGLE_USER_AUTO_LOGIN: process.env.SINGLE_USER_AUTO_LOGIN ?? 'true',
SYSTEM_APPS_AUTO_REFRESH: process.env.SYSTEM_APPS_AUTO_REFRESH ?? 'true',
},
stdout: 'inherit',
stderr: 'inherit',
})
proc.exited.then((code) => {
console.log(`Tronbyt server exited with code ${code}`)
healthy = false
proc = undefined
})
if (await waitForHealthy()) {
healthy = true
console.log('Tronbyt server is healthy')
} else {
console.error('Tronbyt server failed to become healthy')
}
}
export function shutdown() {
if (!proc) return
console.log('Shutting down tronbyt server...')
proc.kill('SIGTERM')
}

11
src/env.ts Normal file
View File

@ -0,0 +1,11 @@
function required(name: string): string {
const value = process.env[name]
if (!value) throw new Error(`Missing required env var: ${name}`)
return value
}
export const DATA_DIR = required('DATA_DIR')
export const PORT = Number(process.env.PORT) || 3000
export const PRODUCTION = process.env.PRODUCTION ?? 'true'
export const SINGLE_USER_AUTO_LOGIN = process.env.SINGLE_USER_AUTO_LOGIN ?? 'true'
export const SYSTEM_APPS_AUTO_REFRESH = process.env.SYSTEM_APPS_AUTO_REFRESH ?? 'true'

View File

@ -1,4 +1,5 @@
import type { ServerWebSocket } from 'bun'
import { isHealthy, isRunning } from './binary'
export interface WsData {
path: string
@ -8,61 +9,86 @@ export interface WsData {
const GO_PORT = 8000
const GO_BASE = `http://127.0.0.1:${GO_PORT}`
const upstreams = new Map<ServerWebSocket<WsData>, WebSocket>()
const WS_CONNECT_TIMEOUT = 5000
export function createProxy(isHealthy: () => boolean, isRunning: () => boolean) {
async function proxyFetch(req: Request): Promise<Response> {
const url = new URL(req.url)
if (url.pathname === '/ok') {
if (!isHealthy()) return new Response('starting', { status: isRunning() ? 200 : 503 })
return fetch(`${GO_BASE}/health`)
.then((r) => (r.ok ? new Response('ok') : new Response('unhealthy', { status: 503 })))
.catch(() => new Response('unhealthy', { status: 503 }))
}
const hasBody = req.method !== 'GET' && req.method !== 'HEAD'
const body = hasBody ? await req.arrayBuffer() : undefined
return fetch(`${GO_BASE}${url.pathname}${url.search}`, {
method: req.method,
headers: req.headers,
body,
redirect: 'manual',
}).then((r) => {
// Bun auto-decompresses gzip but leaves content-encoding header.
// Strip it so the next proxy layer doesn't try to decompress again.
const headers = new Headers(r.headers)
headers.delete('content-encoding')
headers.delete('content-length')
return new Response(r.body, { status: r.status, headers })
}).catch((e) => {
console.error('Proxy error:', e)
return new Response('Tronbyt server is not responding', { status: 502 })
})
}
const websocket = {
open(ws: ServerWebSocket<WsData>) {
const upstream = new WebSocket(`ws://127.0.0.1:${GO_PORT}${ws.data.path}`, ws.data.protocols)
upstream.binaryType = 'arraybuffer'
upstreams.set(ws, upstream)
upstream.addEventListener('message', (e) => ws.send(e.data as string | ArrayBuffer))
upstream.addEventListener('close', () => { upstreams.delete(ws); ws.close() })
upstream.addEventListener('error', () => { upstreams.delete(ws); ws.close() })
},
message(ws: ServerWebSocket<WsData>, msg: string | ArrayBuffer | Uint8Array) {
const upstream = upstreams.get(ws)
if (upstream?.readyState === WebSocket.OPEN) upstream.send(msg)
},
close(ws: ServerWebSocket<WsData>) {
upstreams.get(ws)?.close()
upstreams.delete(ws)
},
}
return { proxyFetch, websocket }
export function healthCheck(): Promise<Response> {
if (!isHealthy()) return Promise.resolve(new Response('starting', { status: isRunning() ? 200 : 503 }))
return fetch(`${GO_BASE}/health`)
.then((r) => (r.ok ? new Response('ok') : new Response('unhealthy', { status: 503 })))
.catch(() => new Response('unhealthy', { status: 503 }))
}
export async function proxyFetch(req: Request): Promise<Response> {
const url = new URL(req.url)
const hasBody = req.method !== 'GET' && req.method !== 'HEAD'
const body = hasBody ? await req.arrayBuffer() : undefined
return fetch(`${GO_BASE}${url.pathname}${url.search}`, {
method: req.method,
headers: req.headers,
body,
redirect: 'manual',
}).then((r) => {
// Bun auto-decompresses gzip but leaves content-encoding header.
// Strip it so the next proxy layer doesn't try to decompress again.
const headers = new Headers(r.headers)
headers.delete('content-encoding')
headers.delete('content-length')
return new Response(r.body, { status: r.status, headers })
}).catch((e) => {
console.error('Proxy error:', e)
return new Response('Tronbyt server is not responding', { status: 502 })
})
}
interface UpstreamState {
socket: WebSocket
pending: (string | ArrayBuffer | Uint8Array)[]
}
const upstreams = new Map<ServerWebSocket<WsData>, UpstreamState>()
function cleanup(ws: ServerWebSocket<WsData>) {
const state = upstreams.get(ws)
if (!state) return
upstreams.delete(ws)
if (state.socket.readyState <= WebSocket.OPEN) state.socket.close()
}
export const websocket = {
open(ws: ServerWebSocket<WsData>) {
const socket = new WebSocket(`ws://127.0.0.1:${GO_PORT}${ws.data.path}`, ws.data.protocols)
socket.binaryType = 'arraybuffer'
const state: UpstreamState = { socket, pending: [] }
upstreams.set(ws, state)
const timeout = setTimeout(() => {
console.error('WebSocket upstream connection timed out')
cleanup(ws)
ws.close()
}, WS_CONNECT_TIMEOUT)
socket.addEventListener('open', () => {
clearTimeout(timeout)
for (const msg of state.pending) socket.send(msg)
state.pending.length = 0
})
socket.addEventListener('message', (e) => ws.send(e.data as string | ArrayBuffer))
socket.addEventListener('close', () => { cleanup(ws); ws.close() })
socket.addEventListener('error', () => { clearTimeout(timeout); cleanup(ws); ws.close() })
},
message(ws: ServerWebSocket<WsData>, msg: string | ArrayBuffer | Uint8Array) {
const state = upstreams.get(ws)
if (!state) return
if (state.socket.readyState === WebSocket.OPEN) {
state.socket.send(msg)
} else {
state.pending.push(msg)
}
},
close(ws: ServerWebSocket<WsData>) {
cleanup(ws)
},
}

View File

@ -1,25 +1,27 @@
import { createProxy, type WsData } from './proxy'
import { isHealthy, isRunning, shutdown, spawn } from './binary'
import { healthCheck, proxyFetch, websocket, type WsData } from './proxy'
import { shutdown, spawn } from './binary'
import { PORT } from './env'
const DATA_DIR = process.env.DATA_DIR!
const PORT = Number(process.env.PORT) || 3000
const { proxyFetch, websocket } = createProxy(isHealthy, isRunning)
const server = Bun.serve({
const server = Bun.serve<WsData>({
port: PORT,
hostname: '::',
idleTimeout: 255,
fetch(req, server) {
const url = new URL(req.url)
if (url.pathname === '/ok') {
return healthCheck()
}
if (req.headers.get('upgrade')?.toLowerCase() === 'websocket') {
const url = new URL(req.url)
const protocolHeader = req.headers.get('sec-websocket-protocol')
const protocols = protocolHeader ? protocolHeader.split(',').map((p) => p.trim()) : []
const headers: Record<string, string> = {}
if (protocolHeader) headers['sec-websocket-protocol'] = protocolHeader
if (server.upgrade(req, { data: { path: url.pathname + url.search, protocols }, headers })) return
const upgradeSuccessful = server.upgrade(req, { data: { path: url.pathname + url.search, protocols }, headers })
if (upgradeSuccessful) return
return new Response('WebSocket upgrade failed', { status: 500 })
}
@ -34,4 +36,4 @@ console.log(`Listening on port ${server.port}`)
process.on('SIGTERM', shutdown)
process.on('SIGINT', shutdown)
spawn(DATA_DIR)
spawn()