Refactor event stream to use EventSource API
This commit is contained in:
parent
002f0a64ef
commit
0aba9bde63
|
|
@ -11,61 +11,37 @@ interface Listener {
|
||||||
|
|
||||||
const _listeners = new Set<Listener>()
|
const _listeners = new Set<Listener>()
|
||||||
|
|
||||||
let _abort: AbortController | undefined
|
let _source: EventSource | undefined
|
||||||
let _connected = false
|
|
||||||
|
|
||||||
function ensureConnection() {
|
function ensureConnection() {
|
||||||
if (_connected) return
|
if (_source) return
|
||||||
_connected = true
|
|
||||||
const url = `${process.env.TOES_URL}/api/events/stream`
|
const url = `${process.env.TOES_URL}/api/events/stream`
|
||||||
_abort = new AbortController()
|
_source = new EventSource(url)
|
||||||
|
|
||||||
fetch(url, { signal: _abort.signal })
|
_source.onerror = () => {
|
||||||
.then(async (res) => {
|
if (_source?.readyState === EventSource.CLOSED) {
|
||||||
const reader = res.body!.getReader()
|
console.warn('[toes] Event stream closed unexpectedly')
|
||||||
const decoder = new TextDecoder()
|
_source = undefined
|
||||||
let buf = ''
|
}
|
||||||
|
}
|
||||||
|
|
||||||
while (true) {
|
_source.onmessage = (e) => {
|
||||||
const { done, value } = await reader.read()
|
|
||||||
if (done) break
|
|
||||||
buf += decoder.decode(value, { stream: true })
|
|
||||||
|
|
||||||
const lines = buf.split('\n')
|
|
||||||
buf = lines.pop()!
|
|
||||||
for (const line of lines) {
|
|
||||||
if (!line.startsWith('data: ')) continue
|
|
||||||
const payload = line.slice(6)
|
|
||||||
if (!payload) continue
|
|
||||||
try {
|
try {
|
||||||
const event: ToesEvent = JSON.parse(payload)
|
const event: ToesEvent = JSON.parse(e.data)
|
||||||
_listeners.forEach(l => {
|
_listeners.forEach(l => {
|
||||||
if (l.types.includes(event.type)) l.callback(event)
|
if (l.types.includes(event.type)) l.callback(event)
|
||||||
})
|
})
|
||||||
} catch (e) {
|
} catch {
|
||||||
console.warn('[toes] Failed to parse event:', e)
|
// Ignore malformed events
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
})
|
|
||||||
.catch((e) => {
|
|
||||||
if (e.name === 'AbortError') return
|
|
||||||
console.warn('[toes] Event stream error, reconnecting...', e.message)
|
|
||||||
})
|
|
||||||
.finally(() => {
|
|
||||||
_connected = false
|
|
||||||
if (_listeners.size > 0) {
|
|
||||||
setTimeout(ensureConnection, 1000)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function closeConnection() {
|
function closeConnection() {
|
||||||
if (_abort) {
|
if (_source) {
|
||||||
_abort.abort()
|
_source.close()
|
||||||
_abort = undefined
|
_source = undefined
|
||||||
}
|
}
|
||||||
_connected = false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export function on(type: ToesEventType | ToesEventType[], callback: EventCallback): () => void {
|
export function on(type: ToesEventType | ToesEventType[], callback: EventCallback): () => void {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user