toes/src/server/apps.ts
2026-02-09 10:50:21 -08:00

851 lines
23 KiB
TypeScript

import type { App as SharedApp, AppState } from '@types'
import type { Subprocess } from 'bun'
import { DEFAULT_EMOJI } from '@types'
import { appendFileSync, existsSync, mkdirSync, readdirSync, readFileSync, realpathSync, renameSync, symlinkSync, unlinkSync, writeFileSync } from 'fs'
import { join, resolve } from 'path'
import { appLog, hostLog, setApps } from './tui'
export type { AppState } from '@types'
export const APPS_DIR = process.env.APPS_DIR ?? resolve(join(process.env.DATA_DIR ?? '.', 'apps'))
export const TOES_DIR = process.env.TOES_DIR ?? join(process.env.DATA_DIR ?? '.', 'toes')
export const TOES_URL = process.env.TOES_URL ?? `http://localhost:${process.env.PORT || 3000}`
const HEALTH_CHECK_FAILURES_BEFORE_RESTART = 3
const HEALTH_CHECK_INTERVAL = 30000
const HEALTH_CHECK_TIMEOUT = 5000
const LOG_RETENTION_DAYS = 7
const MAX_LOGS = 100
const MAX_PORT = 3100
const MIN_PORT = 3001
const RESTART_DELAYS = [1000, 2000, 4000, 8000, 16000, 32000]
const SHUTDOWN_TIMEOUT = 10000
const STABLE_RUN_TIME = 60000
const STARTUP_TIMEOUT = 30000
const _appPorts = new Map<string, number>()
const _apps = new Map<string, App>()
const _availablePorts: number[] = []
const _listeners = new Set<() => void>()
let _shuttingDown = false
export type App = SharedApp & {
consecutiveHealthFailures?: number
healthCheckTimer?: Timer
isHttpApp?: boolean
lastRestartTime?: number
manuallyStopped?: boolean
proc?: Subprocess
restartAttempts?: number
shutdownTimer?: Timer
startupTimer?: Timer
}
type LoadResult = { pkg: any, error?: string }
export const allApps = (): App[] =>
Array.from(_apps.values())
.sort((a, b) => a.name.localeCompare(b.name))
export const getApp = (dir: string): App | undefined =>
_apps.get(dir)
export const runApps = () =>
allAppDirs().filter(isApp).forEach(startApp)
export const runningApps = (): App[] =>
allApps().filter(a => a.state === 'running')
export function getLogDates(appName: string): string[] {
const dir = logDir(appName)
if (!existsSync(dir)) return []
return readdirSync(dir)
.filter(f => f.endsWith('.log'))
.map(f => f.replace('.log', ''))
.sort()
.reverse()
}
export function readLogs(appName: string, date?: string, tail?: number): string[] {
const file = logFile(appName, date ?? formatLogDate())
if (!existsSync(file)) return []
const content = readFileSync(file, 'utf-8')
const lines = content.split('\n').filter(Boolean)
if (tail && tail > 0) {
return lines.slice(-tail)
}
return lines
}
export function initApps() {
initPortPool()
setupShutdownHandlers()
rotateLogs()
createAppSymlinks()
discoverApps()
runApps()
}
export function onChange(cb: () => void) {
_listeners.add(cb)
return () => _listeners.delete(cb)
}
export function removeApp(dir: string) {
const app = _apps.get(dir)
if (!app) return
// Clear all timers
clearTimers(app)
if (app.state === 'running')
app.proc?.kill()
// Release port if assigned
if (app.port) {
releasePort(app.port)
}
_apps.delete(dir)
update()
}
export function registerApp(dir: string) {
if (_apps.has(dir)) return // Already registered
const { pkg, error } = loadApp(dir)
const state: AppState = error ? 'invalid' : 'stopped'
const icon = pkg.toes?.icon ?? DEFAULT_EMOJI
const tool = pkg.toes?.tool
_apps.set(dir, { name: dir, state, icon, error, tool })
update()
if (!error) {
runApp(dir, getPort(dir))
}
}
export function renameApp(oldName: string, newName: string): { ok: boolean, error?: string } {
const app = _apps.get(oldName)
if (!app) return { ok: false, error: 'App not found' }
if (_apps.has(newName)) return { ok: false, error: 'An app with that name already exists' }
if (!/^[a-z][a-z0-9-]*$/.test(newName)) {
return { ok: false, error: 'Name must start with a letter and contain only lowercase letters, numbers, and hyphens' }
}
const oldPath = join(APPS_DIR, oldName)
const newPath = join(APPS_DIR, newName)
// Stop the app if running
const wasRunning = app.state === 'running'
if (wasRunning) {
clearTimers(app)
app.proc?.kill()
app.proc = undefined
if (app.port) releasePort(app.port)
app.port = undefined
app.started = undefined
}
try {
renameSync(oldPath, newPath)
} catch (e) {
return { ok: false, error: `Failed to rename directory: ${e instanceof Error ? e.message : String(e)}` }
}
// Transfer port mapping to new name
const oldPort = _appPorts.get(oldName)
if (oldPort !== undefined) {
_appPorts.delete(oldName)
_appPorts.set(newName, oldPort)
}
// Update the internal registry
_apps.delete(oldName)
app.name = newName
app.state = 'stopped'
app.manuallyStopped = false
app.restartAttempts = 0
_apps.set(newName, app)
update()
// Restart if it was running
if (wasRunning) {
startApp(newName)
}
return { ok: true }
}
export function startApp(dir: string) {
const app = _apps.get(dir)
if (!app || (app.state !== 'stopped' && app.state !== 'invalid')) return
if (!isApp(dir)) return
// Clear flags when explicitly starting
app.manuallyStopped = false
app.error = undefined
runApp(dir, getPort(dir))
}
export async function restartApp(dir: string): Promise<void> {
const app = _apps.get(dir)
if (!app) return
// Stop if running
if (app.state === 'running' || app.state === 'starting') {
stopApp(dir)
// Poll until stopped (with timeout)
const maxWait = 10000 // 10 seconds
const pollInterval = 100
let waited = 0
while (_apps.get(dir)?.state !== 'stopped' && waited < maxWait) {
await new Promise(resolve => setTimeout(resolve, pollInterval))
waited += pollInterval
}
if (_apps.get(dir)?.state !== 'stopped') {
throw new Error(`App ${dir} failed to stop after ${maxWait}ms`)
}
}
// Start the app
startApp(dir)
}
export function stopApp(dir: string) {
const app = _apps.get(dir)
if (!app || app.state !== 'running') return
info(app, 'Stopping...')
app.state = 'stopping'
app.manuallyStopped = true
update()
// Clear health check timer
if (app.healthCheckTimer) {
clearInterval(app.healthCheckTimer)
app.healthCheckTimer = undefined
}
// Start shutdown timeout - escalate to SIGKILL if needed
startShutdownTimeout(app)
app.proc?.kill()
}
export function updateAppIcon(dir: string, icon: string) {
const { pkg, error } = loadApp(dir)
if (error) throw new Error(error)
pkg.toes ??= {}
pkg.toes.icon = icon
saveApp(dir, pkg)
}
const clearTimers = (app: App) => {
if (app.startupTimer) {
clearTimeout(app.startupTimer)
app.startupTimer = undefined
}
if (app.shutdownTimer) {
clearTimeout(app.shutdownTimer)
app.shutdownTimer = undefined
}
if (app.healthCheckTimer) {
clearInterval(app.healthCheckTimer)
app.healthCheckTimer = undefined
}
}
const formatLogDate = (date: Date = new Date()) =>
date.toISOString().slice(0, 10)
const info = (app: App, ...msg: string[]) => {
appLog(app, ...msg)
app.logs?.push({ time: Date.now(), text: msg.join(' ') })
}
const logDir = (appName: string) =>
join(APPS_DIR, appName, 'logs')
const logFile = (appName: string, date: string = formatLogDate()) =>
join(logDir(appName), `${date}.log`)
const isApp = (dir: string): boolean =>
!loadApp(dir).error
const update = () => {
setApps(allApps())
_listeners.forEach(cb => cb())
}
function allAppDirs() {
return readdirSync(APPS_DIR, { withFileTypes: true })
.filter(e => e.isDirectory() && existsSync(join(APPS_DIR, e.name, 'current')))
.map(e => e.name)
.sort()
}
function createAppSymlinks() {
for (const app of readdirSync(APPS_DIR, { withFileTypes: true })) {
if (!app.isDirectory()) continue
const appDir = join(APPS_DIR, app.name)
const currentPath = join(appDir, 'current')
if (existsSync(currentPath)) continue
// Find valid version directories
const versions = readdirSync(appDir, { withFileTypes: true })
.filter(e => {
if (!e.isDirectory()) return false
const pkgPath = join(appDir, e.name, 'package.json')
if (!existsSync(pkgPath)) return false
try {
const pkg = JSON.parse(readFileSync(pkgPath, 'utf-8'))
return !!pkg.scripts?.toes
} catch {
return false
}
})
.map(e => e.name)
.sort()
.reverse()
const latest = versions[0]
if (latest) {
symlinkSync(latest, currentPath)
}
}
}
function discoverApps() {
for (const dir of allAppDirs()) {
const { pkg, error } = loadApp(dir)
const state: AppState = error ? 'invalid' : 'stopped'
const icon = pkg.toes?.icon ?? DEFAULT_EMOJI
const tool = pkg.toes?.tool
_apps.set(dir, { name: dir, state, icon, error, tool })
}
update()
}
function ensureLogDir(appName: string): string {
const dir = logDir(appName)
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true })
}
return dir
}
function getPort(appName?: string): number {
// Try to return the same port this app used before
if (appName) {
const previousPort = _appPorts.get(appName)
if (previousPort !== undefined) {
// Check if it's still in the available pool
const idx = _availablePorts.indexOf(previousPort)
if (idx !== -1) {
_availablePorts.splice(idx, 1)
return previousPort
}
// Port is in use by another app, fall through to get new one
}
}
// Get next available port
const port = _availablePorts.shift()
if (port === undefined) {
// Pool exhausted - this shouldn't happen with 100 ports
throw new Error('No available ports')
}
// Remember this port for the app
if (appName) {
_appPorts.set(appName, port)
}
return port
}
async function gracefulShutdown(signal: string) {
if (_shuttingDown) return
_shuttingDown = true
hostLog(`Received ${signal}, shutting down gracefully...`)
const running = runningApps()
if (running.length === 0) {
hostLog('No apps running, exiting.')
process.exit(0)
}
hostLog(`Stopping ${running.length} app(s)...`)
// Stop all running apps
for (const app of running) {
app.manuallyStopped = true
if (app.healthCheckTimer) {
clearInterval(app.healthCheckTimer)
app.healthCheckTimer = undefined
}
app.proc?.kill()
}
// Wait for all apps to exit with timeout
const shutdownStart = Date.now()
const checkInterval = setInterval(() => {
const stillRunning = runningApps()
if (stillRunning.length === 0) {
clearInterval(checkInterval)
hostLog('All apps stopped, exiting.')
process.exit(0)
}
// Check for timeout
if (Date.now() - shutdownStart > SHUTDOWN_TIMEOUT) {
clearInterval(checkInterval)
hostLog(`Shutdown timeout, forcing ${stillRunning.length} app(s) to stop...`)
for (const app of stillRunning) {
if (app.proc) {
app.proc.kill(9) // SIGKILL
}
}
// Give a moment for SIGKILL to take effect
setTimeout(() => {
hostLog('Forced shutdown complete, exiting.')
process.exit(1)
}, 500)
}
}, 100)
}
function handleHealthCheckFailure(app: App) {
app.consecutiveHealthFailures = (app.consecutiveHealthFailures ?? 0) + 1
info(app, `Health check failed (${app.consecutiveHealthFailures}/${HEALTH_CHECK_FAILURES_BEFORE_RESTART})`)
if (app.consecutiveHealthFailures >= HEALTH_CHECK_FAILURES_BEFORE_RESTART) {
info(app, 'Too many health check failures, restarting...')
// Clear health check timer before killing
if (app.healthCheckTimer) {
clearInterval(app.healthCheckTimer)
app.healthCheckTimer = undefined
}
// Don't set manuallyStopped - we want auto-restart to kick in
app.proc?.kill()
}
}
function initPortPool() {
_availablePorts.length = 0
for (let port = MIN_PORT; port <= MAX_PORT; port++) {
_availablePorts.push(port)
}
}
function markAsRunning(app: App, port: number, isHttpApp: boolean) {
if (app.startupTimer) {
clearTimeout(app.startupTimer)
app.startupTimer = undefined
}
app.state = 'running'
app.started = Date.now()
app.isHttpApp = isHttpApp
update()
if (isHttpApp) {
startHealthChecks(app, port)
} else {
startProcessHealthChecks(app)
}
}
function loadApp(dir: string): LoadResult {
try {
const pkgPath = join(APPS_DIR, dir, 'current', 'package.json')
const file = readFileSync(pkgPath, 'utf-8')
try {
const json = JSON.parse(file)
if (json.scripts?.toes) {
return { pkg: json }
} else {
return { pkg: json, error: 'Missing scripts.toes in package.json' }
}
} catch (e) {
const error = `Invalid JSON in package.json: ${e instanceof Error ? e.message : String(e)}`
return { pkg: {}, error }
}
} catch (e) {
return { pkg: {}, error: 'Missing package.json' }
}
}
function loadAppEnv(appName: string): Record<string, string> {
const envDir = join(TOES_DIR, 'env')
const env: Record<string, string> = {}
const parseEnvFile = (path: string) => {
if (!existsSync(path)) return
const content = readFileSync(path, 'utf-8')
for (const line of content.split('\n')) {
const trimmed = line.trim()
if (!trimmed || trimmed.startsWith('#')) continue
const eqIndex = trimmed.indexOf('=')
if (eqIndex === -1) continue
const key = trimmed.slice(0, eqIndex).trim()
let value = trimmed.slice(eqIndex + 1).trim()
// Remove surrounding quotes
if ((value.startsWith('"') && value.endsWith('"')) ||
(value.startsWith("'") && value.endsWith("'"))) {
value = value.slice(1, -1)
}
if (key) env[key] = value
}
}
parseEnvFile(join(envDir, '_global.env'))
parseEnvFile(join(envDir, `${appName}.env`))
return env
}
function maybeResetBackoff(app: App) {
if (app.started && Date.now() - app.started >= STABLE_RUN_TIME) {
app.restartAttempts = 0
}
}
function releasePort(port: number) {
// Return port to pool if not already there
if (!_availablePorts.includes(port)) {
_availablePorts.push(port)
// Keep sorted for predictable allocation
_availablePorts.sort((a, b) => a - b)
}
}
function rotateLogs() {
const cutoff = Date.now() - LOG_RETENTION_DAYS * 24 * 60 * 60 * 1000
for (const appName of allAppDirs()) {
const dir = logDir(appName)
if (!existsSync(dir)) continue
for (const file of readdirSync(dir)) {
if (!file.endsWith('.log')) continue
const dateStr = file.replace('.log', '')
const fileDate = new Date(dateStr).getTime()
if (fileDate < cutoff) {
unlinkSync(join(dir, file))
hostLog(`Rotated old log: ${appName}/logs/${file}`)
}
}
}
}
function writeLogLine(appName: string, streamType: 'stdout' | 'stderr' | 'system', text: string) {
ensureLogDir(appName)
const timestamp = new Date().toISOString()
const line = `[${timestamp}] [${streamType}] ${text}\n`
appendFileSync(logFile(appName), line)
}
async function runApp(dir: string, port: number) {
const { error } = loadApp(dir)
if (error) return
const app = _apps.get(dir)
if (!app) return
// Set state to starting
app.state = 'starting'
app.port = port
app.logs = []
app.consecutiveHealthFailures = 0
update()
// Start startup timeout
app.startupTimer = setTimeout(() => {
if (app.state === 'starting') {
info(app, 'Startup timeout, killing process...')
app.proc?.kill()
}
}, STARTUP_TIMEOUT)
// Resolve symlink to actual timestamp directory
const currentLink = join(APPS_DIR, dir, 'current')
const cwd = realpathSync(currentLink)
const needsInstall = !existsSync(join(cwd, 'node_modules'))
if (needsInstall) info(app, 'Installing dependencies...')
const install = Bun.spawn(['bun', 'install'], { cwd, stdout: 'pipe', stderr: 'pipe' })
await install.exited
info(app, `Starting on port ${port}...`)
// Load env vars from TOES_DIR/env/
const appEnv = loadAppEnv(dir)
const proc = Bun.spawn(['bun', 'run', 'toes'], {
cwd,
env: { ...process.env, ...appEnv, PORT: String(port), NO_AUTOPORT: 'true', APPS_DIR, TOES_DIR, TOES_URL },
stdout: 'pipe',
stderr: 'pipe',
})
app.proc = proc
// Check if process is alive using ps(1) - more reliable than Bun's API
const isProcessAlive = async (pid: number): Promise<boolean> => {
try {
const ps = Bun.spawn(['ps', '-p', String(pid)], { stdout: 'pipe', stderr: 'pipe' })
const code = await ps.exited
return code === 0
} catch {
return false
}
}
// Poll to verify app started - tries /ok for HTTP apps, falls back to survival check
const pollStartup = async () => {
const pollInterval = 500
const survivalThreshold = 5000 // Consider non-HTTP apps running after 5s
const startTime = Date.now()
const pid = proc.pid
while (app.state === 'starting' && app.proc === proc) {
// First check if process is still alive
const alive = await isProcessAlive(pid)
if (!alive) {
info(app, 'Process died during startup')
// proc.exited handler will clean up
return
}
// Try /ok endpoint for HTTP apps
try {
const controller = new AbortController()
const timeout = setTimeout(() => controller.abort(), 2000)
const response = await fetch(`http://localhost:${port}/ok`, {
signal: controller.signal,
})
clearTimeout(timeout)
if (response.ok) {
// HTTP app is running and healthy
markAsRunning(app, port, true)
return
}
// App responded but /ok returned error - mark as error and kill
info(app, `/ok returned ${response.status}`)
app.error = `Health check failed: /ok returned ${response.status}`
app.proc?.kill()
return
} catch {
// Connection failed - app not ready yet or not an HTTP app
}
// If process survived long enough, consider it running (non-HTTP app)
if (Date.now() - startTime >= survivalThreshold) {
info(app, 'No /ok endpoint, marking as running (process survived 5s)')
markAsRunning(app, port, false)
return
}
await new Promise(resolve => setTimeout(resolve, pollInterval))
}
}
pollStartup()
const streamOutput = async (stream: ReadableStream<Uint8Array> | null, streamType: 'stdout' | 'stderr') => {
if (!stream) return
const reader = stream.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value)
const lines = chunk.split('\n').map(l => l.trimEnd()).filter(Boolean)
for (const text of lines) {
// Skip health check logs (e.g., "200 GET http://localhost:3001/ok (0ms)")
if (/\bGET\b.*\/ok\b/.test(text)) continue
info(app, text)
writeLogLine(dir, streamType, text)
app.logs = (app.logs ?? []).slice(-MAX_LOGS)
}
if (lines.length) update()
}
}
streamOutput(proc.stdout, 'stdout')
streamOutput(proc.stderr, 'stderr')
// Handle process exit
proc.exited.then(code => {
// Clear all timers
clearTimers(app)
// Check if app was stable before crashing (for backoff reset)
maybeResetBackoff(app)
if (code !== 0) {
const msg = `Exited with code ${code}`
app.logs?.push({ time: Date.now(), text: msg })
writeLogLine(dir, 'system', msg)
} else {
app.logs?.push({ time: Date.now(), text: 'Stopped' })
writeLogLine(dir, 'system', 'Stopped')
}
// Release port back to pool
if (app.port) {
releasePort(app.port)
}
// Reset to stopped state (or invalid if error or no longer valid)
app.state = (isApp(dir) && !app.error) ? 'stopped' : 'invalid'
app.proc = undefined
app.port = undefined
app.started = undefined
update()
// Schedule restart if appropriate
if (shouldAutoRestart(app, code)) {
scheduleRestart(app, dir)
}
})
}
function saveApp(dir: string, pkg: any) {
const path = join(APPS_DIR, dir, 'current', 'package.json')
writeFileSync(path, JSON.stringify(pkg, null, 2) + '\n')
}
function scheduleRestart(app: App, dir: string) {
const attempts = app.restartAttempts ?? 0
const delayIndex = Math.min(attempts, RESTART_DELAYS.length - 1)
const delay = RESTART_DELAYS[delayIndex]!
app.restartAttempts = attempts + 1
app.lastRestartTime = Date.now()
info(app, `Scheduling restart in ${delay / 1000}s (attempt ${app.restartAttempts})...`)
setTimeout(() => {
// Double-check conditions before restarting
if (_shuttingDown) return
if (app.manuallyStopped) return
if (app.state !== 'stopped') return
if (!isApp(dir)) return
info(app, 'Restarting...')
runApp(dir, getPort(dir))
}, delay)
}
function setupShutdownHandlers() {
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'))
process.on('SIGINT', () => gracefulShutdown('SIGINT'))
}
function shouldAutoRestart(app: App, exitCode: number | null): boolean {
// Don't restart during host shutdown
if (_shuttingDown) return false
// Don't restart if manually stopped
if (app.manuallyStopped) return false
// Don't restart if app became invalid
if (app.state === 'invalid') return false
// Only restart on non-zero exit codes (crashes)
if (exitCode === 0) return false
return true
}
function startHealthChecks(app: App, port: number) {
app.healthCheckTimer = setInterval(async () => {
if (app.state !== 'running') {
if (app.healthCheckTimer) {
clearInterval(app.healthCheckTimer)
app.healthCheckTimer = undefined
}
return
}
try {
const controller = new AbortController()
const timeout = setTimeout(() => controller.abort(), HEALTH_CHECK_TIMEOUT)
const response = await fetch(`http://localhost:${port}/ok`, {
signal: controller.signal,
})
clearTimeout(timeout)
if (response.ok) {
// Reset consecutive failures on success
app.consecutiveHealthFailures = 0
} else {
handleHealthCheckFailure(app)
}
} catch (e) {
handleHealthCheckFailure(app)
}
}, HEALTH_CHECK_INTERVAL)
}
function startProcessHealthChecks(app: App) {
// For non-HTTP apps, just verify process is still alive using ps(1)
app.healthCheckTimer = setInterval(async () => {
if (app.state !== 'running') {
if (app.healthCheckTimer) {
clearInterval(app.healthCheckTimer)
app.healthCheckTimer = undefined
}
return
}
const pid = app.proc?.pid
if (!pid) {
handleHealthCheckFailure(app)
return
}
try {
const ps = Bun.spawn(['ps', '-p', String(pid)], { stdout: 'pipe', stderr: 'pipe' })
const code = await ps.exited
if (code === 0) {
// Process is alive
app.consecutiveHealthFailures = 0
} else {
handleHealthCheckFailure(app)
}
} catch {
handleHealthCheckFailure(app)
}
}, HEALTH_CHECK_INTERVAL)
}
function startShutdownTimeout(app: App) {
app.shutdownTimer = setTimeout(() => {
if (app.proc && (app.state === 'stopping' || app.state === 'running')) {
info(app, 'Shutdown timeout, sending SIGKILL...')
app.proc.kill(9) // SIGKILL
}
}, SHUTDOWN_TIMEOUT)
}