Compare commits

...

3 Commits

Author SHA1 Message Date
2c726c5bd2 nicer retry error message 2025-09-26 21:32:16 -07:00
d6638263a7 output handles that 2025-09-26 21:14:44 -07:00
97a6b952d5 streaming 2025-09-26 21:12:48 -07:00
9 changed files with 181 additions and 23 deletions

14
app/nose/bin/counter.tsx Normal file
View File

@ -0,0 +1,14 @@
import { stream } from "@/stream"
export default async function () {
let count = 1
return stream(<p>{count}</p>, async ({ replace }) => {
for (let i = 1; i < 10; i++) {
await Bun.sleep(250)
count++
replace(<p>{count}</p>)
}
})
}

View File

@ -20,7 +20,7 @@ export async function dispatchMessage(ws: any, msg: Message) {
}
async function inputMessage(ws: any, msg: Message) {
const result = await runCommand(msg.session || "", msg.id || "", msg.data as string)
const result = await runCommand(msg.session || "", msg.id || "", msg.data as string, ws)
send(ws, { id: msg.id, type: "output", data: result })
}

View File

@ -3,6 +3,7 @@
// input, output, etc
import { scrollback, $$ } from "./dom.js"
import { randomId } from "../shared/utils.js"
import type { CommandOutput } from "../shared/types.js"
type InputStatus = "waiting" | "streaming" | "ok" | "error"
@ -48,7 +49,7 @@ export function setStatus(id: string, status: InputStatus) {
export function addOutput(id: string, output: CommandOutput) {
const item = $$("li")
item.classList.add("output")
item.dataset.id = id
item.dataset.id = id || randomId()
const [format, content] = processOutput(output)
@ -61,12 +62,55 @@ export function addOutput(id: string, output: CommandOutput) {
autoScroll()
}
export function addErrorMessage(message: string) {
addOutput("", { html: `<span class="red">${message}</span>` })
}
export function appendOutput(id: string, output: CommandOutput) {
const item = document.querySelector(`[data-id="${id}"].output`)
if (!item) {
console.error(`output id ${id} not found`)
return
}
const [format, content] = processOutput(output)
if (format === "html")
item.innerHTML += content
else
item.textContent += content
autoScroll()
}
export function replaceOutput(id: string, output: CommandOutput) {
const item = document.querySelector(`[data-id="${id}"].output`)
if (!item) {
console.error(`output id ${id} not found`)
return
}
const [format, content] = processOutput(output)
if (format === "html")
item.innerHTML = content
else
item.textContent = content
autoScroll()
}
function processOutput(output: CommandOutput): ["html" | "text", string] {
let content = ""
let html = false
if (typeof output === "string") {
content = output
} else if (Array.isArray(output)) {
content = output.join(" ")
} else if (output.html !== undefined) {
html = true
content = output.html

View File

@ -1,8 +1,8 @@
////
// The shell runs on the server and processes input, returning output.
import type { Message, CommandResult } from "../shared/types.js"
import { addInput, setStatus, addOutput } from "./scrollback.js"
import type { Message, CommandResult, CommandOutput } from "../shared/types.js"
import { addInput, setStatus, addOutput, appendOutput, replaceOutput } from "./scrollback.js"
import { send } from "./websocket.js"
import { randomId } from "../shared/utils.js"
import { addToHistory } from "./history.js"
@ -16,7 +16,7 @@ export function runCommand(input: string) {
addToHistory(input)
addInput(id, input)
const [cmd = "", ...args] = input.split(" ")
const [cmd = "", ..._args] = input.split(" ")
if (browserCommands[cmd]) {
const result = browserCommands[cmd]()
@ -30,13 +30,22 @@ export function runCommand(input: string) {
// message received from server
export function handleMessage(msg: Message) {
if (msg.type === "output") {
handleOutput(msg)
} else if (msg.type === "commands") {
cacheCommands(msg.data as string[])
} else if (msg.type === "error") {
console.error(msg.data)
} else {
switch (msg.type) {
case "output":
handleOutput(msg); break
case "commands":
cacheCommands(msg.data as string[]); break
case "error":
console.error(msg.data); break
case "stream:start":
handleStreamStart(msg); break
case "stream:end":
handleStreamEnd(msg); break
case "stream:append":
handleStreamAppend(msg); break
case "stream:replace":
handleStreamReplace(msg); break
default:
console.error("unknown message type", msg)
}
}
@ -46,3 +55,26 @@ function handleOutput(msg: Message) {
setStatus(msg.id!, result.status)
addOutput(msg.id!, result.output)
}
function handleStreamStart(msg: Message) {
const id = msg.id!
const status = document.querySelector(`[data-id="${id}"].input .status`)
if (!status) return
addOutput(id, msg.data as CommandOutput)
status.classList.remove("yellow")
status.classList.add("purple")
}
function handleStreamAppend(msg: Message) {
appendOutput(msg.id!, msg.data as CommandOutput)
}
function handleStreamReplace(msg: Message) {
replaceOutput(msg.id!, msg.data as CommandOutput)
}
function handleStreamEnd(_msg: Message) {
}

View File

@ -4,6 +4,10 @@
import type { Message } from "../shared/types.js"
import { sessionID } from "./session.js"
import { handleMessage } from "./shell.js"
import { addErrorMessage } from "./scrollback.js"
const MAX_RETRIES = 5
let retries = 0
let ws: WebSocket | null = null
@ -13,9 +17,8 @@ export function startConnection() {
url.protocol = url.protocol.replace('http', 'ws')
ws = new WebSocket(url)
ws.onopen = () => console.log('WS connected')
ws.onmessage = receive
ws.onclose = () => setTimeout(startConnection, 1000) // simple retry
ws.onclose = retryConnection
ws.onerror = () => ws?.close()
}
@ -37,3 +40,13 @@ export function close() {
ws?.close(1000, 'bye')
}
function retryConnection() {
if (retries >= MAX_RETRIES) {
addErrorMessage(`!! Failed to reconnect ${retries} times. Server is down.`)
if (ws) ws.onclose = () => { }
return
}
retries++
addErrorMessage(`!! Connection lost. Retrying...`)
setTimeout(startConnection, 1000)
}

View File

@ -7,6 +7,7 @@ export type Session = {
taskId?: string
sessionId?: string
project?: string
ws?: any
}
// Ensure "ALS" lives between bun's hot reloads

View File

@ -2,12 +2,13 @@ export type Message = {
session?: string
id?: string
type: MessageType
data: CommandResult | string | string[]
data?: CommandResult | CommandOutput
}
export type MessageType = "error" | "input" | "output" | "commands" | "save-file"
| "stream:start" | "stream:end" | "stream:append" | "stream:replace"
export type CommandOutput = string | { html: string }
export type CommandOutput = string | string[] | { html: string }
export type CommandResult = {
status: "ok" | "error"

View File

@ -11,16 +11,15 @@ import { ALS } from "./session"
const sessions: Map<string, Session> = new Map()
export async function runCommand(sessionId: string, taskId: string, input: string): Promise<CommandResult> {
export async function runCommand(sessionId: string, taskId: string, input: string, ws?: any): Promise<CommandResult> {
const [cmd = "", ...args] = input.split(" ")
if (!commandExists(cmd)) {
if (!commandExists(cmd))
return { status: "error", output: `${cmd} not found` }
}
let status: "ok" | "error" = "ok"
let output: CommandOutput = ""
const state = getState(sessionId, taskId)
const state = getState(sessionId, taskId, ws)
try {
[status, output] = await ALS.run(state, async () => await exec(cmd, args))
@ -41,7 +40,7 @@ async function exec(cmd: string, args: string[]): Promise<["ok" | "error", Comma
return processExecOutput(await module.default(...args))
}
function processExecOutput(output: string | any): ["ok" | "error", CommandOutput] {
export function processExecOutput(output: string | any): ["ok" | "error", CommandOutput] {
if (typeof output === "string") {
return ["ok", output]
} else if (typeof output === "object") {
@ -52,18 +51,21 @@ function processExecOutput(output: string | any): ["ok" | "error", CommandOutput
} else {
return ["ok", output]
}
} else if (output === undefined) {
return ["ok", ""]
} else {
return ["ok", String(output)]
}
}
function getState(sessionId: string, taskId: string): Session {
function getState(sessionId: string, taskId: string, ws?: any): Session {
let state = sessions.get(sessionId)
if (!state) {
state = { sessionId: sessionId, project: "" }
sessions.set(sessionId, state)
}
state.taskId = taskId
if (ws) state.ws = ws
return state
}

51
app/src/stream.ts Normal file
View File

@ -0,0 +1,51 @@
import { send as sendWs } from "./websocket"
import { getState } from "./session"
import { processExecOutput } from "./shell"
import type { Child } from "hono/jsx"
import type { CommandOutput, Message } from "./shared/types"
type StreamFn = (output: Child) => Promise<void>
type StreamFns = { replace: StreamFn, append: StreamFn }
type StreamParamFn = (fns: StreamFns) => Promise<void>
export async function stream(initOrFn: StreamParamFn | string | any, fn?: StreamParamFn) {
const state = getState()
if (!state) throw "stream() called outside runCommand()"
let error = false
const taskId = state.taskId
const session = state.sessionId
const send = (msg: Message) => {
sendWs(state.ws, { ...msg, id: taskId, session })
}
const append = async (output: string | any) => {
const [status, data] = processExecOutput(output)
if (status === "error") error = true
send({ type: "stream:append", data })
}
const replace = async (output: string | any) => {
const [status, data] = processExecOutput(output)
if (status === "error") error = true
send({ type: "stream:replace", data })
}
let init: CommandOutput = ""
let func
if (typeof initOrFn === "function") {
func = initOrFn
} else {
init = processExecOutput(initOrFn)[1]
func = fn
}
return new Promise<void>(async resolve => {
send({ type: "stream:start", data: init })
await func({ replace, append })
send({ type: "stream:end", data: error ? "error" : "ok" })
resolve()
})
}