toes/src/tools/events.ts
Chris Wanstrath 55316027c0 heartbeat
2026-02-27 15:14:43 -08:00

84 lines
2.1 KiB
TypeScript

import type { ToesEvent, ToesEventType } from '../shared/events'
export type { ToesEvent, ToesEventType }
type EventCallback = (event: ToesEvent) => void
interface Listener {
types: ToesEventType[]
callback: EventCallback
}
const _listeners = new Set<Listener>()
let _abort: AbortController | undefined
let _connected = false
function ensureConnection() {
if (_connected) return
_connected = true
const url = `${process.env.TOES_URL}/api/events/stream`
_abort = new AbortController()
fetch(url, { signal: _abort.signal })
.then(async (res) => {
const reader = res.body!.getReader()
const decoder = new TextDecoder()
let buf = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buf += decoder.decode(value, { stream: true })
const lines = buf.split('\n')
buf = lines.pop()!
for (const line of lines) {
if (!line.startsWith('data: ')) continue
const payload = line.slice(6)
if (!payload) continue
try {
const event: ToesEvent = JSON.parse(payload)
_listeners.forEach(l => {
if (l.types.includes(event.type)) l.callback(event)
})
} catch (e) {
console.warn('[toes] Failed to parse event:', e)
}
}
}
})
.catch((e) => {
if (e.name === 'AbortError') return
console.warn('[toes] Event stream error, reconnecting...', e.message)
})
.finally(() => {
_connected = false
if (_listeners.size > 0) {
setTimeout(ensureConnection, 1000)
}
})
}
function closeConnection() {
if (_abort) {
_abort.abort()
_abort = undefined
}
_connected = false
}
export function on(type: ToesEventType | ToesEventType[], callback: EventCallback): () => void {
const listener: Listener = {
types: Array.isArray(type) ? type : [type],
callback,
}
_listeners.add(listener)
ensureConnection()
return () => {
_listeners.delete(listener)
if (_listeners.size === 0) closeConnection()
}
}