toes/src/tools/events.ts

78 lines
2.0 KiB
TypeScript

import type { ToesEvent, ToesEventType } from '../shared/events'
export type { ToesEvent, ToesEventType }
type EventCallback = (event: ToesEvent) => void
interface Listener {
types: ToesEventType[]
callback: EventCallback
}
const _listeners = new Set<Listener>()
let _abort: AbortController | undefined
function ensureConnection() {
if (_abort) return
const url = `${process.env.TOES_URL}/api/events/stream`
_abort = new AbortController()
connect(url, _abort.signal)
}
function closeConnection() {
if (_abort) {
_abort.abort()
_abort = undefined
}
}
async function connect(url: string, signal: AbortSignal) {
while (!signal.aborted) {
try {
const res = await fetch(url, { signal })
if (!res.ok || !res.body) throw new Error(`SSE ${res.status}`)
const reader = res.body.getReader()
const decoder = new TextDecoder()
let buf = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buf += decoder.decode(value, { stream: true })
const parts = buf.split('\n\n')
buf = parts.pop()!
for (const part of parts) {
const line = part.split('\n').find(l => l.startsWith('data:'))
if (!line) continue
try {
const event: ToesEvent = JSON.parse(line.slice(5).trim())
_listeners.forEach(l => {
if (l.types.includes(event.type)) l.callback(event)
})
} catch {
// Ignore malformed events
}
}
}
} catch (err) {
if (signal.aborted) return
console.warn('[toes] Event stream error, reconnecting...')
}
if (!signal.aborted) await new Promise(r => setTimeout(r, 2000))
}
}
export function on(type: ToesEventType | ToesEventType[], callback: EventCallback): () => void {
const listener: Listener = {
types: Array.isArray(type) ? type : [type],
callback,
}
_listeners.add(listener)
ensureConnection()
return () => {
_listeners.delete(listener)
if (_listeners.size === 0) closeConnection()
}
}