From f16201114ef7e3924ffb044d53fdace6717ee7cc Mon Sep 17 00:00:00 2001 From: Chris Wanstrath Date: Sun, 8 Mar 2026 23:03:00 -0700 Subject: [PATCH] Replace EventSource with fetch-based SSE with reconnect --- src/tools/events.ts | 66 ++++++++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/src/tools/events.ts b/src/tools/events.ts index a91740c..6804dfb 100644 --- a/src/tools/events.ts +++ b/src/tools/events.ts @@ -11,36 +11,54 @@ interface Listener { const _listeners = new Set() -let _source: EventSource | undefined +let _abort: AbortController | undefined function ensureConnection() { - if (_source) return + if (_abort) return const url = `${process.env.TOES_URL}/api/events/stream` - _source = new EventSource(url) - - _source.onerror = () => { - if (_source?.readyState === EventSource.CLOSED) { - console.warn('[toes] Event stream closed unexpectedly') - _source = undefined - } - } - - _source.onmessage = (e) => { - try { - const event: ToesEvent = JSON.parse(e.data) - _listeners.forEach(l => { - if (l.types.includes(event.type)) l.callback(event) - }) - } catch { - // Ignore malformed events - } - } + _abort = new AbortController() + connect(url, _abort.signal) } function closeConnection() { - if (_source) { - _source.close() - _source = undefined + if (_abort) { + _abort.abort() + _abort = undefined + } +} + +async function connect(url: string, signal: AbortSignal) { + while (!signal.aborted) { + try { + const res = await fetch(url, { signal }) + if (!res.ok || !res.body) throw new Error(`SSE ${res.status}`) + const reader = res.body.getReader() + const decoder = new TextDecoder() + let buf = '' + while (true) { + const { done, value } = await reader.read() + if (done) break + buf += decoder.decode(value, { stream: true }) + const parts = buf.split('\n\n') + buf = parts.pop()! + for (const part of parts) { + const line = part.split('\n').find(l => l.startsWith('data:')) + if (!line) continue + try { + const event: ToesEvent = JSON.parse(line.slice(5).trim()) + _listeners.forEach(l => { + if (l.types.includes(event.type)) l.callback(event) + }) + } catch { + // Ignore malformed events + } + } + } + } catch (err) { + if (signal.aborted) return + console.warn('[toes] Event stream error, reconnecting...') + } + if (!signal.aborted) await new Promise(r => setTimeout(r, 2000)) } }