diff --git a/CLAUDE.md b/CLAUDE.md index 4d6ac86..60b2954 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -138,7 +138,10 @@ The server sets these on each app process: `PORT`, `APPS_DIR`, `TOES_URL`, `TOES ### SSE Streaming -`/api/apps/stream` pushes the full app list on every state change. Client reconnects automatically. The `onChange()` callback system in `apps.ts` notifies listeners. +Two SSE endpoints serve different consumers: + +- `/api/apps/stream` -- Full app state snapshots on every change. Used by the dashboard UI. Driven by `onChange()` in `apps.ts`. +- `/api/events/stream` -- Discrete lifecycle events (`app:start`, `app:stop`, `app:activate`, `app:create`, `app:delete`). Used by app processes to react to other apps' lifecycle changes. Driven by `emit()`/`onEvent()` in `apps.ts`. Apps subscribe via `on()` from `@because/toes/tools`. ## Coding Guidelines diff --git a/src/server/api/apps.ts b/src/server/api/apps.ts index c951513..f1e99f2 100644 --- a/src/server/api/apps.ts +++ b/src/server/api/apps.ts @@ -21,7 +21,8 @@ function convert(app: BackendApp): SharedApp { return { ...rest, pid: proc?.pid } } -// SSE endpoint for real-time app state updates +// SSE: full app state snapshots for the dashboard UI (every state change) +// For discrete lifecycle events consumed by app processes, see /api/events/stream router.sse('/stream', (send) => { const broadcast = () => { const apps: SharedApp[] = allApps().map(({ diff --git a/src/server/api/events.ts b/src/server/api/events.ts new file mode 100644 index 0000000..526039c --- /dev/null +++ b/src/server/api/events.ts @@ -0,0 +1,14 @@ +import { onEvent } from '$apps' +import { Hype } from '@because/hype' + +const router = Hype.router() + +// SSE: discrete lifecycle events for app processes (start, stop, deploy, etc.) +// Unlike /api/apps/stream (full state snapshots for the dashboard), this sends +// individual events so apps can react to specific lifecycle changes. +router.sse('/stream', (send) => { + const unsub = onEvent(event => send(event)) + return unsub +}) + +export default router diff --git a/src/server/api/sync.ts b/src/server/api/sync.ts index 4ba269e..09c0b45 100644 --- a/src/server/api/sync.ts +++ b/src/server/api/sync.ts @@ -1,4 +1,4 @@ -import { APPS_DIR, allApps, registerApp, removeApp, restartApp, startApp } from '$apps' +import { APPS_DIR, allApps, emit, registerApp, removeApp, restartApp, startApp } from '$apps' import { computeHash, generateManifest } from '../sync' import { loadGitignore } from '@gitignore' import { cpSync, existsSync, mkdirSync, readdirSync, readFileSync, realpathSync, renameSync, rmSync, symlinkSync, unlinkSync, watch, writeFileSync } from 'fs' @@ -330,6 +330,8 @@ router.post('/apps/:app/activate', async c => { console.error(`Failed to clean up old versions: ${e}`) } + emit({ type: 'app:activate', app: appName, version }) + // Register new app or restart existing const app = allApps().find(a => a.name === appName) if (!app) { diff --git a/src/server/apps.ts b/src/server/apps.ts index 1224134..22a10c8 100644 --- a/src/server/apps.ts +++ b/src/server/apps.ts @@ -1,4 +1,5 @@ import type { App as SharedApp, AppState } from '@types' +import type { ToesEvent, ToesEventInput, ToesEventType } from '../shared/events' import type { Subprocess } from 'bun' import { DEFAULT_EMOJI } from '@types' import { appendFileSync, existsSync, mkdirSync, readdirSync, readFileSync, realpathSync, renameSync, symlinkSync, unlinkSync, writeFileSync } from 'fs' @@ -31,6 +32,7 @@ const STARTUP_TIMEOUT = 30000 const _appPorts = new Map() const _apps = new Map() const _availablePorts: number[] = [] +const _eventListeners = new Set<(event: ToesEvent) => void>() const _listeners = new Set<() => void>() let _shuttingDown = false @@ -106,11 +108,22 @@ export async function initApps() { runApps() } +export function emit(event: ToesEventInput) { + // Cast: ToesEventInput is DistributiveOmit, so adding time + // back produces ToesEvent. TS can't prove this because spreads don't distribute. + _eventListeners.forEach(cb => cb({ ...event, time: Date.now() } as ToesEvent)) +} + export function onChange(cb: () => void) { _listeners.add(cb) return () => _listeners.delete(cb) } +export function onEvent(cb: (event: ToesEvent) => void) { + _eventListeners.add(cb) + return () => _eventListeners.delete(cb) +} + export function removeApp(dir: string) { const app = _apps.get(dir) if (!app) return @@ -130,6 +143,7 @@ export function removeApp(dir: string) { _apps.delete(dir) update() + emit({ type: 'app:delete', app: dir }) } export function registerApp(dir: string) { @@ -141,6 +155,7 @@ export function registerApp(dir: string) { const tool = pkg.toes?.tool _apps.set(dir, { name: dir, state, icon, error, tool }) update() + emit({ type: 'app:create', app: dir }) if (!error) { runApp(dir, getPort(dir)) } @@ -502,6 +517,7 @@ function markAsRunning(app: App, port: number, isHttpApp: boolean) { app.started = Date.now() app.isHttpApp = isHttpApp update() + emit({ type: 'app:start', app: app.name }) openTunnelIfEnabled(app.name, port) if (isHttpApp) { @@ -737,6 +753,7 @@ async function runApp(dir: string, port: number) { app.port = undefined app.started = undefined update() + if (!_shuttingDown) emit({ type: 'app:stop', app: dir }) // Schedule restart if appropriate if (shouldAutoRestart(app, code)) { diff --git a/src/server/index.tsx b/src/server/index.tsx index 248cb78..a2d0b20 100644 --- a/src/server/index.tsx +++ b/src/server/index.tsx @@ -1,5 +1,6 @@ import { allApps, initApps, TOES_URL } from '$apps' import appsRouter from './api/apps' +import eventsRouter from './api/events' import syncRouter from './api/sync' import systemRouter from './api/system' import { Hype } from '@because/hype' @@ -7,6 +8,7 @@ import { Hype } from '@because/hype' const app = new Hype({ layout: false, logging: !!process.env.DEBUG }) app.route('/api/apps', appsRouter) +app.route('/api/events', eventsRouter) app.route('/api/sync', syncRouter) app.route('/api/system', systemRouter) diff --git a/src/shared/events.ts b/src/shared/events.ts new file mode 100644 index 0000000..744caee --- /dev/null +++ b/src/shared/events.ts @@ -0,0 +1,17 @@ +export type ToesEventType = 'app:activate' | 'app:create' | 'app:delete' | 'app:start' | 'app:stop' + +interface BaseEvent { + app: string + time: number +} + +export type ToesEvent = + | BaseEvent & { type: 'app:activate'; version: string } + | BaseEvent & { type: 'app:create' } + | BaseEvent & { type: 'app:delete' } + | BaseEvent & { type: 'app:start' } + | BaseEvent & { type: 'app:stop' } + +type DistributiveOmit = T extends any ? Omit : never + +export type ToesEventInput = DistributiveOmit diff --git a/src/tools/events.ts b/src/tools/events.ts new file mode 100644 index 0000000..30e6f0b --- /dev/null +++ b/src/tools/events.ts @@ -0,0 +1,57 @@ +import type { ToesEvent, ToesEventType } from '../shared/events' + +export type { ToesEvent, ToesEventType } + +type EventCallback = (event: ToesEvent) => void + +interface Listener { + types: ToesEventType[] + callback: EventCallback +} + +const _listeners = new Set() + +let _es: EventSource | undefined + +function ensureConnection() { + if (_es && _es.readyState !== EventSource.CLOSED) return + if (_es) _es.close() + const url = `${process.env.TOES_URL}/api/events/stream` + _es = new EventSource(url) + + _es.onerror = () => { + if (_es?.readyState === EventSource.CLOSED) { + console.warn('[toes] Event stream closed, reconnecting...') + _es = undefined + if (_listeners.size > 0) ensureConnection() + } + } + + _es.onmessage = (msg) => { + try { + const event: ToesEvent = JSON.parse(msg.data) + _listeners.forEach(l => { + if (l.types.includes(event.type)) l.callback(event) + }) + } catch (e) { + console.warn('[toes] Failed to parse event:', e) + } + } +} + +export function on(type: ToesEventType | ToesEventType[], callback: EventCallback): () => void { + const listener: Listener = { + types: Array.isArray(type) ? type : [type], + callback, + } + _listeners.add(listener) + ensureConnection() + + return () => { + _listeners.delete(listener) + if (_listeners.size === 0 && _es) { + _es.close() + _es = undefined + } + } +} diff --git a/src/tools/index.ts b/src/tools/index.ts index 3057967..d2a98e2 100644 --- a/src/tools/index.ts +++ b/src/tools/index.ts @@ -1,3 +1,5 @@ export { theme } from '../client/themes' export { loadAppEnv } from './env' +export type { ToesEvent, ToesEventType } from './events' +export { on } from './events' export { baseStyles, ToolScript } from './scripts.tsx'