Fix proxy and split server into modules #1
115
src/binary.ts
Normal file
115
src/binary.ts
Normal file
|
|
@ -0,0 +1,115 @@
|
|||
import { join } from 'path'
|
||||
import { accessSync, chmodSync, constants, mkdirSync, unlinkSync } from 'fs'
|
||||
import type { Subprocess } from 'bun'
|
||||
|
||||
const BIN_DIR = join(import.meta.dir, '..', 'bin')
|
||||
|
||||
let healthy = false
|
||||
let proc: Subprocess | undefined
|
||||
|
||||
export const isHealthy = () => healthy
|
||||
export const isRunning = () => !!proc
|
||||
|
||||
function getBinaryName(): string {
|
||||
const platform = process.platform === 'darwin' ? 'darwin' : 'linux'
|
||||
const arch = process.arch === 'x64' ? 'amd64' : 'arm64'
|
||||
return `tronbyt-server-${platform}-${arch}`
|
||||
}
|
||||
|
||||
async function waitForHealthy(socketPath: string, maxAttempts = 60): Promise<boolean> {
|
||||
for (let i = 0; i < maxAttempts; i++) {
|
||||
try {
|
||||
const resp = await fetch('http://localhost/health', { unix: socketPath })
|
||||
if (resp.ok) return true
|
||||
} catch {}
|
||||
await Bun.sleep(1000)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
async function downloadBinary(binPath: string): Promise<boolean> {
|
||||
const name = getBinaryName()
|
||||
const url = `https://github.com/tronbyt/server/releases/latest/download/${name}`
|
||||
|
||||
console.log(`Downloading ${name}...`)
|
||||
try {
|
||||
const resp = await fetch(url, { redirect: 'follow' })
|
||||
if (!resp.ok) {
|
||||
console.error(`Download failed: ${resp.status} ${resp.statusText}`)
|
||||
return false
|
||||
}
|
||||
|
||||
mkdirSync(BIN_DIR, { recursive: true })
|
||||
await Bun.write(binPath, resp)
|
||||
|
||||
chmodSync(binPath, 0o755)
|
||||
|
||||
console.log('Download complete')
|
||||
return true
|
||||
} catch (e) {
|
||||
console.error('Download failed:', e)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
function validate(dataDir: string): string | undefined {
|
||||
if (!dataDir) return 'DATA_DIR env var is not set — toes should provide this automatically'
|
||||
|
||||
const binPath = join(BIN_DIR, getBinaryName())
|
||||
try {
|
||||
accessSync(binPath, constants.X_OK)
|
||||
} catch {
|
||||
return `Binary not found or not executable: ${binPath}`
|
||||
}
|
||||
}
|
||||
|
||||
export async function spawn(dataDir: string, socketPath: string) {
|
||||
const binPath = join(BIN_DIR, getBinaryName())
|
||||
|
||||
if (!(await Bun.file(binPath).exists())) {
|
||||
if (!(await downloadBinary(binPath))) return
|
||||
}
|
||||
|
||||
const error = validate(dataDir)
|
||||
if (error) {
|
||||
console.error(`Setup error: ${error}`)
|
||||
return
|
||||
}
|
||||
|
||||
try { unlinkSync(socketPath) } catch {}
|
||||
|
||||
console.log('Starting tronbyt server...')
|
||||
|
||||
proc = Bun.spawn([binPath], {
|
||||
env: {
|
||||
...process.env,
|
||||
TRONBYT_UNIX_SOCKET: socketPath,
|
||||
DATA_DIR: dataDir,
|
||||
DB_DSN: join(dataDir, 'tronbyt.db'),
|
||||
PRODUCTION: process.env.PRODUCTION ?? 'false',
|
||||
SINGLE_USER_AUTO_LOGIN: process.env.SINGLE_USER_AUTO_LOGIN ?? 'true',
|
||||
SYSTEM_APPS_AUTO_REFRESH: process.env.SYSTEM_APPS_AUTO_REFRESH ?? 'true',
|
||||
},
|
||||
stdout: 'inherit',
|
||||
stderr: 'inherit',
|
||||
})
|
||||
|
||||
proc.exited.then((code) => {
|
||||
console.log(`Tronbyt server exited with code ${code}`)
|
||||
healthy = false
|
||||
proc = undefined
|
||||
})
|
||||
|
||||
if (await waitForHealthy(socketPath)) {
|
||||
healthy = true
|
||||
console.log('Tronbyt server is healthy')
|
||||
} else {
|
||||
console.error('Tronbyt server failed to become healthy')
|
||||
}
|
||||
}
|
||||
|
||||
export function shutdown() {
|
||||
if (!proc) return
|
||||
console.log('Shutting down tronbyt server...')
|
||||
proc.kill('SIGTERM')
|
||||
}
|
||||
64
src/proxy.ts
Normal file
64
src/proxy.ts
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
import type { ServerWebSocket } from 'bun'
|
||||
|
||||
export interface WsData {
|
||||
path: string
|
||||
protocols: string[]
|
||||
}
|
||||
|
||||
const upstreams = new Map<ServerWebSocket<WsData>, WebSocket>()
|
||||
|
||||
export function createProxy(socketPath: string, isHealthy: () => boolean, isRunning: () => boolean) {
|
||||
function proxyFetch(req: Request): Promise<Response> | Response {
|
||||
const url = new URL(req.url)
|
||||
|
||||
if (url.pathname === '/ok') {
|
||||
if (!isHealthy()) return new Response('starting', { status: isRunning() ? 200 : 503 })
|
||||
return fetch('http://localhost/health', { unix: socketPath })
|
||||
.then((r) => (r.ok ? new Response('ok') : new Response('unhealthy', { status: 503 })))
|
||||
.catch(() => new Response('unhealthy', { status: 503 }))
|
||||
}
|
||||
|
||||
const hasBody = req.method !== 'GET' && req.method !== 'HEAD'
|
||||
const headers = new Headers(req.headers)
|
||||
headers.delete('accept-encoding')
|
||||
|
||||
return fetch(`http://localhost${url.pathname}${url.search}`, {
|
||||
method: req.method,
|
||||
headers,
|
||||
body: hasBody ? req.body : undefined,
|
||||
unix: socketPath,
|
||||
}).then((r) => {
|
||||
const respHeaders = new Headers(r.headers)
|
||||
respHeaders.delete('content-encoding')
|
||||
respHeaders.delete('content-length')
|
||||
return new Response(r.body, { status: r.status, headers: respHeaders })
|
||||
}).catch((e) => {
|
||||
console.error('Proxy error:', e)
|
||||
return new Response('Tronbyt server is not responding', { status: 502 })
|
||||
})
|
||||
}
|
||||
|
||||
const websocket = {
|
||||
open(ws: ServerWebSocket<WsData>) {
|
||||
const upstream = new WebSocket(`ws+unix://${socketPath}:${ws.data.path}`, ws.data.protocols)
|
||||
upstream.binaryType = 'arraybuffer'
|
||||
upstreams.set(ws, upstream)
|
||||
|
||||
upstream.addEventListener('message', (e) => ws.send(e.data as string | ArrayBuffer))
|
||||
upstream.addEventListener('close', () => { upstreams.delete(ws); ws.close() })
|
||||
upstream.addEventListener('error', () => { upstreams.delete(ws); ws.close() })
|
||||
},
|
||||
|
||||
message(ws: ServerWebSocket<WsData>, msg: string | ArrayBuffer | Uint8Array) {
|
||||
const upstream = upstreams.get(ws)
|
||||
if (upstream?.readyState === WebSocket.OPEN) upstream.send(msg)
|
||||
},
|
||||
|
||||
close(ws: ServerWebSocket<WsData>) {
|
||||
upstreams.get(ws)?.close()
|
||||
upstreams.delete(ws)
|
||||
},
|
||||
}
|
||||
|
||||
return { proxyFetch, websocket }
|
||||
}
|
||||
185
src/server.ts
185
src/server.ts
|
|
@ -1,79 +1,12 @@
|
|||
import { join } from 'path'
|
||||
import { accessSync, chmodSync, constants, mkdirSync, unlinkSync } from 'fs'
|
||||
import type { ServerWebSocket, Subprocess } from 'bun'
|
||||
import { createProxy, type WsData } from './proxy'
|
||||
import { isHealthy, isRunning, shutdown, spawn } from './binary'
|
||||
|
||||
const DATA_DIR = process.env.DATA_DIR
|
||||
const DATA_DIR = process.env.DATA_DIR!
|
||||
const PORT = Number(process.env.PORT) || 3000
|
||||
const SOCKET_PATH = DATA_DIR ? join(DATA_DIR, 'tronbyt.sock') : ''
|
||||
const BIN_DIR = join(import.meta.dir, '..', 'bin')
|
||||
const SOCKET_PATH = join(DATA_DIR, 'tronbyt.sock')
|
||||
|
||||
let goHealthy = false
|
||||
let goProcess: Subprocess | undefined
|
||||
|
||||
interface WsData {
|
||||
path: string
|
||||
protocols: string[]
|
||||
}
|
||||
|
||||
const upstreams = new Map<ServerWebSocket<WsData>, WebSocket>()
|
||||
|
||||
// Proxy fetch to Go server over unix socket
|
||||
|
||||
function proxyFetch(req: Request): Promise<Response> | Response {
|
||||
const url = new URL(req.url)
|
||||
|
||||
if (url.pathname === '/ok') {
|
||||
if (!goHealthy) return new Response('starting', { status: goProcess ? 200 : 503 })
|
||||
return fetch('http://localhost/health', { unix: SOCKET_PATH })
|
||||
.then((r) => (r.ok ? new Response('ok') : new Response('unhealthy', { status: 503 })))
|
||||
.catch(() => new Response('unhealthy', { status: 503 }))
|
||||
}
|
||||
|
||||
const hasBody = req.method !== 'GET' && req.method !== 'HEAD'
|
||||
const headers = new Headers(req.headers)
|
||||
headers.delete('accept-encoding')
|
||||
|
||||
return fetch(`http://localhost${url.pathname}${url.search}`, {
|
||||
method: req.method,
|
||||
headers,
|
||||
body: hasBody ? req.body : undefined,
|
||||
unix: SOCKET_PATH,
|
||||
}).then((r) => {
|
||||
const respHeaders = new Headers(r.headers)
|
||||
respHeaders.delete('content-encoding')
|
||||
respHeaders.delete('content-length')
|
||||
return new Response(r.body, { status: r.status, headers: respHeaders })
|
||||
}).catch((e) => {
|
||||
console.error('Proxy error:', e)
|
||||
return new Response('Tronbyt server is not responding', { status: 502 })
|
||||
})
|
||||
}
|
||||
|
||||
// WebSocket proxy
|
||||
|
||||
const websocket = {
|
||||
open(ws: ServerWebSocket<WsData>) {
|
||||
const upstream = new WebSocket(`ws+unix://${SOCKET_PATH}:${ws.data.path}`, ws.data.protocols)
|
||||
upstream.binaryType = 'arraybuffer'
|
||||
upstreams.set(ws, upstream)
|
||||
|
||||
upstream.addEventListener('message', (e) => ws.send(e.data as string | ArrayBuffer))
|
||||
upstream.addEventListener('close', () => { upstreams.delete(ws); ws.close() })
|
||||
upstream.addEventListener('error', () => { upstreams.delete(ws); ws.close() })
|
||||
},
|
||||
|
||||
message(ws: ServerWebSocket<WsData>, msg: string | ArrayBuffer | Uint8Array) {
|
||||
const upstream = upstreams.get(ws)
|
||||
if (upstream?.readyState === WebSocket.OPEN) upstream.send(msg)
|
||||
},
|
||||
|
||||
close(ws: ServerWebSocket<WsData>) {
|
||||
upstreams.get(ws)?.close()
|
||||
upstreams.delete(ws)
|
||||
},
|
||||
}
|
||||
|
||||
// Server
|
||||
const { proxyFetch, websocket } = createProxy(SOCKET_PATH, isHealthy, isRunning)
|
||||
|
||||
const server = Bun.serve({
|
||||
port: PORT,
|
||||
|
|
@ -100,113 +33,7 @@ const server = Bun.serve({
|
|||
|
||||
console.log(`Listening on port ${server.port}`)
|
||||
|
||||
// Go binary management
|
||||
|
||||
function getBinaryName(): string {
|
||||
const platform = process.platform === 'darwin' ? 'darwin' : 'linux'
|
||||
const arch = process.arch === 'x64' ? 'amd64' : 'arm64'
|
||||
return `tronbyt-server-${platform}-${arch}`
|
||||
}
|
||||
|
||||
async function waitForHealthy(maxAttempts = 60): Promise<boolean> {
|
||||
for (let i = 0; i < maxAttempts; i++) {
|
||||
try {
|
||||
const resp = await fetch('http://localhost/health', { unix: SOCKET_PATH })
|
||||
if (resp.ok) return true
|
||||
} catch {}
|
||||
await Bun.sleep(1000)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
async function downloadBinary(binPath: string): Promise<boolean> {
|
||||
const name = getBinaryName()
|
||||
const url = `https://github.com/tronbyt/server/releases/latest/download/${name}`
|
||||
|
||||
console.log(`Downloading ${name}...`)
|
||||
try {
|
||||
const resp = await fetch(url, { redirect: 'follow' })
|
||||
if (!resp.ok) {
|
||||
console.error(`Download failed: ${resp.status} ${resp.statusText}`)
|
||||
return false
|
||||
}
|
||||
|
||||
mkdirSync(BIN_DIR, { recursive: true })
|
||||
await Bun.write(binPath, resp)
|
||||
|
||||
chmodSync(binPath, 0o755)
|
||||
|
||||
console.log('Download complete')
|
||||
return true
|
||||
} catch (e) {
|
||||
console.error('Download failed:', e)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
function validate(): string | undefined {
|
||||
if (!DATA_DIR) return 'DATA_DIR env var is not set — toes should provide this automatically'
|
||||
|
||||
const binPath = join(BIN_DIR, getBinaryName())
|
||||
try {
|
||||
accessSync(binPath, constants.X_OK)
|
||||
} catch {
|
||||
return `Binary not found or not executable: ${binPath}`
|
||||
}
|
||||
}
|
||||
|
||||
async function spawnGoServer() {
|
||||
const binPath = join(BIN_DIR, getBinaryName())
|
||||
|
||||
if (!(await Bun.file(binPath).exists())) {
|
||||
if (!(await downloadBinary(binPath))) return
|
||||
}
|
||||
|
||||
const error = validate()
|
||||
if (error) {
|
||||
console.error(`Setup error: ${error}`)
|
||||
return
|
||||
}
|
||||
|
||||
try { unlinkSync(SOCKET_PATH) } catch {}
|
||||
|
||||
console.log('Starting tronbyt server...')
|
||||
|
||||
goProcess = Bun.spawn([binPath], {
|
||||
env: {
|
||||
...process.env,
|
||||
TRONBYT_UNIX_SOCKET: SOCKET_PATH,
|
||||
DATA_DIR,
|
||||
DB_DSN: join(DATA_DIR!, 'tronbyt.db'),
|
||||
PRODUCTION: process.env.PRODUCTION ?? 'false',
|
||||
SINGLE_USER_AUTO_LOGIN: process.env.SINGLE_USER_AUTO_LOGIN ?? 'true',
|
||||
SYSTEM_APPS_AUTO_REFRESH: process.env.SYSTEM_APPS_AUTO_REFRESH ?? 'true',
|
||||
},
|
||||
stdout: 'inherit',
|
||||
stderr: 'inherit',
|
||||
})
|
||||
|
||||
goProcess.exited.then((code) => {
|
||||
console.log(`Tronbyt server exited with code ${code}`)
|
||||
goHealthy = false
|
||||
goProcess = undefined
|
||||
})
|
||||
|
||||
if (await waitForHealthy()) {
|
||||
goHealthy = true
|
||||
console.log('Tronbyt server is healthy')
|
||||
} else {
|
||||
console.error('Tronbyt server failed to become healthy')
|
||||
}
|
||||
}
|
||||
|
||||
function shutdown() {
|
||||
if (!goProcess) return
|
||||
console.log('Shutting down tronbyt server...')
|
||||
goProcess.kill('SIGTERM')
|
||||
}
|
||||
|
||||
process.on('SIGTERM', shutdown)
|
||||
process.on('SIGINT', shutdown)
|
||||
|
||||
spawnGoServer()
|
||||
spawn(DATA_DIR, SOCKET_PATH)
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user