import type { ToesEvent, ToesEventType } from '../shared/events' export type { ToesEvent, ToesEventType } type EventCallback = (event: ToesEvent) => void interface Listener { types: ToesEventType[] callback: EventCallback } const _listeners = new Set() let _abort: AbortController | undefined let _connected = false function ensureConnection() { if (_connected) return _connected = true const url = `${process.env.TOES_URL}/api/events/stream` _abort = new AbortController() fetch(url, { signal: _abort.signal }) .then(async (res) => { const reader = res.body!.getReader() const decoder = new TextDecoder() let buf = '' while (true) { const { done, value } = await reader.read() if (done) break buf += decoder.decode(value, { stream: true }) const lines = buf.split('\n') buf = lines.pop()! for (const line of lines) { if (!line.startsWith('data: ')) continue try { const event: ToesEvent = JSON.parse(line.slice(6)) _listeners.forEach(l => { if (l.types.includes(event.type)) l.callback(event) }) } catch (e) { console.warn('[toes] Failed to parse event:', e) } } } }) .catch((e) => { if (e.name === 'AbortError') return console.warn('[toes] Event stream error, reconnecting...', e.message) }) .finally(() => { _connected = false if (_listeners.size > 0) { setTimeout(ensureConnection, 1000) } }) } function closeConnection() { if (_abort) { _abort.abort() _abort = undefined } _connected = false } export function on(type: ToesEventType | ToesEventType[], callback: EventCallback): () => void { const listener: Listener = { types: Array.isArray(type) ? type : [type], callback, } _listeners.add(listener) ensureConnection() return () => { _listeners.delete(listener) if (_listeners.size === 0) closeConnection() } }