streaming

This commit is contained in:
Chris Wanstrath 2025-09-26 21:12:48 -07:00
parent a545c3e056
commit 97a6b952d5
9 changed files with 166 additions and 20 deletions

14
app/nose/bin/counter.tsx Normal file
View File

@ -0,0 +1,14 @@
import { stream } from "@/stream"
export default async function () {
let count = 1
return stream(<p>{count}</p>, async ({ replace }) => {
for (let i = 1; i < 10; i++) {
await Bun.sleep(250)
count++
replace(<p>{count}</p>)
}
})
}

View File

@ -20,7 +20,7 @@ export async function dispatchMessage(ws: any, msg: Message) {
} }
async function inputMessage(ws: any, msg: Message) { async function inputMessage(ws: any, msg: Message) {
const result = await runCommand(msg.session || "", msg.id || "", msg.data as string) const result = await runCommand(msg.session || "", msg.id || "", msg.data as string, ws)
send(ws, { id: msg.id, type: "output", data: result }) send(ws, { id: msg.id, type: "output", data: result })
} }

View File

@ -61,12 +61,51 @@ export function addOutput(id: string, output: CommandOutput) {
autoScroll() autoScroll()
} }
export function appendOutput(id: string, output: CommandOutput) {
const item = document.querySelector(`[data-id="${id}"].output`)
if (!item) {
console.error(`output id ${id} not found`)
return
}
const [format, content] = processOutput(output)
if (format === "html")
item.innerHTML += content
else
item.textContent += content
autoScroll()
}
export function replaceOutput(id: string, output: CommandOutput) {
const item = document.querySelector(`[data-id="${id}"].output`)
if (!item) {
console.error(`output id ${id} not found`)
return
}
const [format, content] = processOutput(output)
if (format === "html")
item.innerHTML = content
else
item.textContent = content
autoScroll()
}
function processOutput(output: CommandOutput): ["html" | "text", string] { function processOutput(output: CommandOutput): ["html" | "text", string] {
let content = "" let content = ""
let html = false let html = false
if (typeof output === "string") { if (typeof output === "string") {
content = output content = output
} else if (Array.isArray(output)) {
content = output.join(" ")
} else if (output.html !== undefined) { } else if (output.html !== undefined) {
html = true html = true
content = output.html content = output.html

View File

@ -1,8 +1,8 @@
//// ////
// The shell runs on the server and processes input, returning output. // The shell runs on the server and processes input, returning output.
import type { Message, CommandResult } from "../shared/types.js" import type { Message, CommandResult, CommandOutput } from "../shared/types.js"
import { addInput, setStatus, addOutput } from "./scrollback.js" import { addInput, setStatus, addOutput, appendOutput, replaceOutput } from "./scrollback.js"
import { send } from "./websocket.js" import { send } from "./websocket.js"
import { randomId } from "../shared/utils.js" import { randomId } from "../shared/utils.js"
import { addToHistory } from "./history.js" import { addToHistory } from "./history.js"
@ -30,13 +30,22 @@ export function runCommand(input: string) {
// message received from server // message received from server
export function handleMessage(msg: Message) { export function handleMessage(msg: Message) {
if (msg.type === "output") { switch (msg.type) {
handleOutput(msg) case "output":
} else if (msg.type === "commands") { handleOutput(msg); break
cacheCommands(msg.data as string[]) case "commands":
} else if (msg.type === "error") { cacheCommands(msg.data as string[]); break
console.error(msg.data) case "error":
} else { console.error(msg.data); break
case "stream:start":
handleStreamStart(msg); break
case "stream:end":
handleStreamEnd(msg); break
case "stream:append":
handleStreamAppend(msg); break
case "stream:replace":
handleStreamReplace(msg); break
default:
console.error("unknown message type", msg) console.error("unknown message type", msg)
} }
} }
@ -46,3 +55,33 @@ function handleOutput(msg: Message) {
setStatus(msg.id!, result.status) setStatus(msg.id!, result.status)
addOutput(msg.id!, result.output) addOutput(msg.id!, result.output)
} }
function handleStreamStart(msg: Message) {
const id = msg.id!
const status = document.querySelector(`[data-id="${id}"].input .status`)
if (!status) return
addOutput(id, msg.data as CommandOutput)
status.classList.remove("yellow")
status.classList.add("purple")
}
function handleStreamAppend(msg: Message) {
appendOutput(msg.id!, msg.data as CommandOutput)
}
function handleStreamReplace(msg: Message) {
replaceOutput(msg.id!, msg.data as CommandOutput)
}
function handleStreamEnd(msg: Message) {
const id = msg.id!
const status = document.querySelector(`[data-id="${id}"].input .status`)
if (!status) return
status.classList.remove("purple")
status.classList.remove("green")
}

View File

@ -13,7 +13,6 @@ export function startConnection() {
url.protocol = url.protocol.replace('http', 'ws') url.protocol = url.protocol.replace('http', 'ws')
ws = new WebSocket(url) ws = new WebSocket(url)
ws.onopen = () => console.log('WS connected')
ws.onmessage = receive ws.onmessage = receive
ws.onclose = () => setTimeout(startConnection, 1000) // simple retry ws.onclose = () => setTimeout(startConnection, 1000) // simple retry
ws.onerror = () => ws?.close() ws.onerror = () => ws?.close()

View File

@ -7,6 +7,7 @@ export type Session = {
taskId?: string taskId?: string
sessionId?: string sessionId?: string
project?: string project?: string
ws?: any
} }
// Ensure "ALS" lives between bun's hot reloads // Ensure "ALS" lives between bun's hot reloads

View File

@ -2,12 +2,13 @@ export type Message = {
session?: string session?: string
id?: string id?: string
type: MessageType type: MessageType
data: CommandResult | string | string[] data?: CommandResult | CommandOutput
} }
export type MessageType = "error" | "input" | "output" | "commands" | "save-file" export type MessageType = "error" | "input" | "output" | "commands" | "save-file"
| "stream:start" | "stream:end" | "stream:append" | "stream:replace"
export type CommandOutput = string | { html: string } export type CommandOutput = string | string[] | { html: string }
export type CommandResult = { export type CommandResult = {
status: "ok" | "error" status: "ok" | "error"

View File

@ -11,16 +11,15 @@ import { ALS } from "./session"
const sessions: Map<string, Session> = new Map() const sessions: Map<string, Session> = new Map()
export async function runCommand(sessionId: string, taskId: string, input: string): Promise<CommandResult> { export async function runCommand(sessionId: string, taskId: string, input: string, ws?: any): Promise<CommandResult> {
const [cmd = "", ...args] = input.split(" ") const [cmd = "", ...args] = input.split(" ")
if (!commandExists(cmd)) { if (!commandExists(cmd))
return { status: "error", output: `${cmd} not found` } return { status: "error", output: `${cmd} not found` }
}
let status: "ok" | "error" = "ok" let status: "ok" | "error" = "ok"
let output: CommandOutput = "" let output: CommandOutput = ""
const state = getState(sessionId, taskId) const state = getState(sessionId, taskId, ws)
try { try {
[status, output] = await ALS.run(state, async () => await exec(cmd, args)) [status, output] = await ALS.run(state, async () => await exec(cmd, args))
@ -41,7 +40,7 @@ async function exec(cmd: string, args: string[]): Promise<["ok" | "error", Comma
return processExecOutput(await module.default(...args)) return processExecOutput(await module.default(...args))
} }
function processExecOutput(output: string | any): ["ok" | "error", CommandOutput] { export function processExecOutput(output: string | any): ["ok" | "error", CommandOutput] {
if (typeof output === "string") { if (typeof output === "string") {
return ["ok", output] return ["ok", output]
} else if (typeof output === "object") { } else if (typeof output === "object") {
@ -52,18 +51,21 @@ function processExecOutput(output: string | any): ["ok" | "error", CommandOutput
} else { } else {
return ["ok", output] return ["ok", output]
} }
} else if (output === undefined) {
return ["ok", ""]
} else { } else {
return ["ok", String(output)] return ["ok", String(output)]
} }
} }
function getState(sessionId: string, taskId: string): Session { function getState(sessionId: string, taskId: string, ws?: any): Session {
let state = sessions.get(sessionId) let state = sessions.get(sessionId)
if (!state) { if (!state) {
state = { sessionId: sessionId, project: "" } state = { sessionId: sessionId, project: "" }
sessions.set(sessionId, state) sessions.set(sessionId, state)
} }
state.taskId = taskId state.taskId = taskId
if (ws) state.ws = ws
return state return state
} }

51
app/src/stream.ts Normal file
View File

@ -0,0 +1,51 @@
import { send as sendWs } from "./websocket"
import { getState } from "./session"
import { processExecOutput } from "./shell"
import type { Child } from "hono/jsx"
import type { CommandOutput, Message } from "./shared/types"
type StreamFn = (output: Child) => Promise<void>
type StreamFns = { replace: StreamFn, append: StreamFn }
type StreamParamFn = (fns: StreamFns) => Promise<void>
export async function stream(initOrFn: StreamParamFn | string | any, fn?: StreamParamFn) {
const state = getState()
if (!state) throw "stream() called outside runCommand()"
let error = false
const taskId = state.taskId
const session = state.sessionId
const send = (msg: Message) => {
sendWs(state.ws, { ...msg, id: taskId, session })
}
const append = async (output: string | any) => {
const [status, data] = processExecOutput(output)
if (status === "error") error = true
send({ type: "stream:append", data })
}
const replace = async (output: string | any) => {
const [status, data] = processExecOutput(output)
if (status === "error") error = true
send({ type: "stream:replace", data })
}
let init: CommandOutput = ""
let func
if (typeof initOrFn === "function") {
func = initOrFn
} else {
init = processExecOutput(initOrFn)[1]
func = fn
}
return new Promise<void>(async resolve => {
send({ type: "stream:start", data: init })
await func({ replace, append })
send({ type: "stream:end", data: error ? "error" : "ok" })
resolve()
})
}