forked from defunkt/toes
Replace EventSource with fetch-based SSE with reconnect
This commit is contained in:
parent
758ad67fd4
commit
f16201114e
|
|
@ -11,23 +11,41 @@ interface Listener {
|
||||||
|
|
||||||
const _listeners = new Set<Listener>()
|
const _listeners = new Set<Listener>()
|
||||||
|
|
||||||
let _source: EventSource | undefined
|
let _abort: AbortController | undefined
|
||||||
|
|
||||||
function ensureConnection() {
|
function ensureConnection() {
|
||||||
if (_source) return
|
if (_abort) return
|
||||||
const url = `${process.env.TOES_URL}/api/events/stream`
|
const url = `${process.env.TOES_URL}/api/events/stream`
|
||||||
_source = new EventSource(url)
|
_abort = new AbortController()
|
||||||
|
connect(url, _abort.signal)
|
||||||
|
}
|
||||||
|
|
||||||
_source.onerror = () => {
|
function closeConnection() {
|
||||||
if (_source?.readyState === EventSource.CLOSED) {
|
if (_abort) {
|
||||||
console.warn('[toes] Event stream closed unexpectedly')
|
_abort.abort()
|
||||||
_source = undefined
|
_abort = undefined
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_source.onmessage = (e) => {
|
async function connect(url: string, signal: AbortSignal) {
|
||||||
|
while (!signal.aborted) {
|
||||||
try {
|
try {
|
||||||
const event: ToesEvent = JSON.parse(e.data)
|
const res = await fetch(url, { signal })
|
||||||
|
if (!res.ok || !res.body) throw new Error(`SSE ${res.status}`)
|
||||||
|
const reader = res.body.getReader()
|
||||||
|
const decoder = new TextDecoder()
|
||||||
|
let buf = ''
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read()
|
||||||
|
if (done) break
|
||||||
|
buf += decoder.decode(value, { stream: true })
|
||||||
|
const parts = buf.split('\n\n')
|
||||||
|
buf = parts.pop()!
|
||||||
|
for (const part of parts) {
|
||||||
|
const line = part.split('\n').find(l => l.startsWith('data:'))
|
||||||
|
if (!line) continue
|
||||||
|
try {
|
||||||
|
const event: ToesEvent = JSON.parse(line.slice(5).trim())
|
||||||
_listeners.forEach(l => {
|
_listeners.forEach(l => {
|
||||||
if (l.types.includes(event.type)) l.callback(event)
|
if (l.types.includes(event.type)) l.callback(event)
|
||||||
})
|
})
|
||||||
|
|
@ -35,12 +53,12 @@ function ensureConnection() {
|
||||||
// Ignore malformed events
|
// Ignore malformed events
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (err) {
|
||||||
function closeConnection() {
|
if (signal.aborted) return
|
||||||
if (_source) {
|
console.warn('[toes] Event stream error, reconnecting...')
|
||||||
_source.close()
|
}
|
||||||
_source = undefined
|
if (!signal.aborted) await new Promise(r => setTimeout(r, 2000))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user