new event API

This commit is contained in:
Chris Wanstrath 2026-02-15 08:36:58 -08:00
parent 271ff151a1
commit bf14ba4ba1
9 changed files with 118 additions and 3 deletions

View File

@ -138,7 +138,10 @@ The server sets these on each app process: `PORT`, `APPS_DIR`, `TOES_URL`, `TOES
### SSE Streaming ### SSE Streaming
`/api/apps/stream` pushes the full app list on every state change. Client reconnects automatically. The `onChange()` callback system in `apps.ts` notifies listeners. Two SSE endpoints serve different consumers:
- `/api/apps/stream` -- Full app state snapshots on every change. Used by the dashboard UI. Driven by `onChange()` in `apps.ts`.
- `/api/events/stream` -- Discrete lifecycle events (`app:start`, `app:stop`, `app:activate`, `app:create`, `app:delete`). Used by app processes to react to other apps' lifecycle changes. Driven by `emit()`/`onEvent()` in `apps.ts`. Apps subscribe via `on()` from `@because/toes/tools`.
## Coding Guidelines ## Coding Guidelines

View File

@ -21,7 +21,8 @@ function convert(app: BackendApp): SharedApp {
return { ...rest, pid: proc?.pid } return { ...rest, pid: proc?.pid }
} }
// SSE endpoint for real-time app state updates // SSE: full app state snapshots for the dashboard UI (every state change)
// For discrete lifecycle events consumed by app processes, see /api/events/stream
router.sse('/stream', (send) => { router.sse('/stream', (send) => {
const broadcast = () => { const broadcast = () => {
const apps: SharedApp[] = allApps().map(({ const apps: SharedApp[] = allApps().map(({

14
src/server/api/events.ts Normal file
View File

@ -0,0 +1,14 @@
import { onEvent } from '$apps'
import { Hype } from '@because/hype'
const router = Hype.router()
// SSE: discrete lifecycle events for app processes (start, stop, deploy, etc.)
// Unlike /api/apps/stream (full state snapshots for the dashboard), this sends
// individual events so apps can react to specific lifecycle changes.
router.sse('/stream', (send) => {
const unsub = onEvent(event => send(event))
return unsub
})
export default router

View File

@ -1,4 +1,4 @@
import { APPS_DIR, allApps, registerApp, removeApp, restartApp, startApp } from '$apps' import { APPS_DIR, allApps, emit, registerApp, removeApp, restartApp, startApp } from '$apps'
import { computeHash, generateManifest } from '../sync' import { computeHash, generateManifest } from '../sync'
import { loadGitignore } from '@gitignore' import { loadGitignore } from '@gitignore'
import { cpSync, existsSync, mkdirSync, readdirSync, readFileSync, realpathSync, renameSync, rmSync, symlinkSync, unlinkSync, watch, writeFileSync } from 'fs' import { cpSync, existsSync, mkdirSync, readdirSync, readFileSync, realpathSync, renameSync, rmSync, symlinkSync, unlinkSync, watch, writeFileSync } from 'fs'
@ -330,6 +330,8 @@ router.post('/apps/:app/activate', async c => {
console.error(`Failed to clean up old versions: ${e}`) console.error(`Failed to clean up old versions: ${e}`)
} }
emit({ type: 'app:activate', app: appName, version })
// Register new app or restart existing // Register new app or restart existing
const app = allApps().find(a => a.name === appName) const app = allApps().find(a => a.name === appName)
if (!app) { if (!app) {

View File

@ -1,4 +1,5 @@
import type { App as SharedApp, AppState } from '@types' import type { App as SharedApp, AppState } from '@types'
import type { ToesEvent, ToesEventInput, ToesEventType } from '../shared/events'
import type { Subprocess } from 'bun' import type { Subprocess } from 'bun'
import { DEFAULT_EMOJI } from '@types' import { DEFAULT_EMOJI } from '@types'
import { appendFileSync, existsSync, mkdirSync, readdirSync, readFileSync, realpathSync, renameSync, symlinkSync, unlinkSync, writeFileSync } from 'fs' import { appendFileSync, existsSync, mkdirSync, readdirSync, readFileSync, realpathSync, renameSync, symlinkSync, unlinkSync, writeFileSync } from 'fs'
@ -31,6 +32,7 @@ const STARTUP_TIMEOUT = 30000
const _appPorts = new Map<string, number>() const _appPorts = new Map<string, number>()
const _apps = new Map<string, App>() const _apps = new Map<string, App>()
const _availablePorts: number[] = [] const _availablePorts: number[] = []
const _eventListeners = new Set<(event: ToesEvent) => void>()
const _listeners = new Set<() => void>() const _listeners = new Set<() => void>()
let _shuttingDown = false let _shuttingDown = false
@ -106,11 +108,22 @@ export async function initApps() {
runApps() runApps()
} }
export function emit(event: ToesEventInput) {
// Cast: ToesEventInput is DistributiveOmit<ToesEvent, 'time'>, so adding time
// back produces ToesEvent. TS can't prove this because spreads don't distribute.
_eventListeners.forEach(cb => cb({ ...event, time: Date.now() } as ToesEvent))
}
export function onChange(cb: () => void) { export function onChange(cb: () => void) {
_listeners.add(cb) _listeners.add(cb)
return () => _listeners.delete(cb) return () => _listeners.delete(cb)
} }
export function onEvent(cb: (event: ToesEvent) => void) {
_eventListeners.add(cb)
return () => _eventListeners.delete(cb)
}
export function removeApp(dir: string) { export function removeApp(dir: string) {
const app = _apps.get(dir) const app = _apps.get(dir)
if (!app) return if (!app) return
@ -130,6 +143,7 @@ export function removeApp(dir: string) {
_apps.delete(dir) _apps.delete(dir)
update() update()
emit({ type: 'app:delete', app: dir })
} }
export function registerApp(dir: string) { export function registerApp(dir: string) {
@ -141,6 +155,7 @@ export function registerApp(dir: string) {
const tool = pkg.toes?.tool const tool = pkg.toes?.tool
_apps.set(dir, { name: dir, state, icon, error, tool }) _apps.set(dir, { name: dir, state, icon, error, tool })
update() update()
emit({ type: 'app:create', app: dir })
if (!error) { if (!error) {
runApp(dir, getPort(dir)) runApp(dir, getPort(dir))
} }
@ -502,6 +517,7 @@ function markAsRunning(app: App, port: number, isHttpApp: boolean) {
app.started = Date.now() app.started = Date.now()
app.isHttpApp = isHttpApp app.isHttpApp = isHttpApp
update() update()
emit({ type: 'app:start', app: app.name })
openTunnelIfEnabled(app.name, port) openTunnelIfEnabled(app.name, port)
if (isHttpApp) { if (isHttpApp) {
@ -737,6 +753,7 @@ async function runApp(dir: string, port: number) {
app.port = undefined app.port = undefined
app.started = undefined app.started = undefined
update() update()
if (!_shuttingDown) emit({ type: 'app:stop', app: dir })
// Schedule restart if appropriate // Schedule restart if appropriate
if (shouldAutoRestart(app, code)) { if (shouldAutoRestart(app, code)) {

View File

@ -1,5 +1,6 @@
import { allApps, initApps, TOES_URL } from '$apps' import { allApps, initApps, TOES_URL } from '$apps'
import appsRouter from './api/apps' import appsRouter from './api/apps'
import eventsRouter from './api/events'
import syncRouter from './api/sync' import syncRouter from './api/sync'
import systemRouter from './api/system' import systemRouter from './api/system'
import { Hype } from '@because/hype' import { Hype } from '@because/hype'
@ -7,6 +8,7 @@ import { Hype } from '@because/hype'
const app = new Hype({ layout: false, logging: !!process.env.DEBUG }) const app = new Hype({ layout: false, logging: !!process.env.DEBUG })
app.route('/api/apps', appsRouter) app.route('/api/apps', appsRouter)
app.route('/api/events', eventsRouter)
app.route('/api/sync', syncRouter) app.route('/api/sync', syncRouter)
app.route('/api/system', systemRouter) app.route('/api/system', systemRouter)

17
src/shared/events.ts Normal file
View File

@ -0,0 +1,17 @@
export type ToesEventType = 'app:activate' | 'app:create' | 'app:delete' | 'app:start' | 'app:stop'
interface BaseEvent {
app: string
time: number
}
export type ToesEvent =
| BaseEvent & { type: 'app:activate'; version: string }
| BaseEvent & { type: 'app:create' }
| BaseEvent & { type: 'app:delete' }
| BaseEvent & { type: 'app:start' }
| BaseEvent & { type: 'app:stop' }
type DistributiveOmit<T, K extends keyof any> = T extends any ? Omit<T, K> : never
export type ToesEventInput = DistributiveOmit<ToesEvent, 'time'>

57
src/tools/events.ts Normal file
View File

@ -0,0 +1,57 @@
import type { ToesEvent, ToesEventType } from '../shared/events'
export type { ToesEvent, ToesEventType }
type EventCallback = (event: ToesEvent) => void
interface Listener {
types: ToesEventType[]
callback: EventCallback
}
const _listeners = new Set<Listener>()
let _es: EventSource | undefined
function ensureConnection() {
if (_es && _es.readyState !== EventSource.CLOSED) return
if (_es) _es.close()
const url = `${process.env.TOES_URL}/api/events/stream`
_es = new EventSource(url)
_es.onerror = () => {
if (_es?.readyState === EventSource.CLOSED) {
console.warn('[toes] Event stream closed, reconnecting...')
_es = undefined
if (_listeners.size > 0) ensureConnection()
}
}
_es.onmessage = (msg) => {
try {
const event: ToesEvent = JSON.parse(msg.data)
_listeners.forEach(l => {
if (l.types.includes(event.type)) l.callback(event)
})
} catch (e) {
console.warn('[toes] Failed to parse event:', e)
}
}
}
export function on(type: ToesEventType | ToesEventType[], callback: EventCallback): () => void {
const listener: Listener = {
types: Array.isArray(type) ? type : [type],
callback,
}
_listeners.add(listener)
ensureConnection()
return () => {
_listeners.delete(listener)
if (_listeners.size === 0 && _es) {
_es.close()
_es = undefined
}
}
}

View File

@ -1,3 +1,5 @@
export { theme } from '../client/themes' export { theme } from '../client/themes'
export { loadAppEnv } from './env' export { loadAppEnv } from './env'
export type { ToesEvent, ToesEventType } from './events'
export { on } from './events'
export { baseStyles, ToolScript } from './scripts.tsx' export { baseStyles, ToolScript } from './scripts.tsx'