diff --git a/app/nose/bin/counter.tsx b/app/nose/bin/counter.tsx new file mode 100644 index 0000000..307aef3 --- /dev/null +++ b/app/nose/bin/counter.tsx @@ -0,0 +1,14 @@ +import { stream } from "@/stream" + +export default async function () { + let count = 1 + + return stream(

{count}

, async ({ replace }) => { + for (let i = 1; i < 10; i++) { + await Bun.sleep(250) + + count++ + replace(

{count}

) + } + }) +} diff --git a/app/src/dispatch.ts b/app/src/dispatch.ts index 13da7e0..8cfa8f8 100644 --- a/app/src/dispatch.ts +++ b/app/src/dispatch.ts @@ -20,7 +20,7 @@ export async function dispatchMessage(ws: any, msg: Message) { } async function inputMessage(ws: any, msg: Message) { - const result = await runCommand(msg.session || "", msg.id || "", msg.data as string) + const result = await runCommand(msg.session || "", msg.id || "", msg.data as string, ws) send(ws, { id: msg.id, type: "output", data: result }) } diff --git a/app/src/js/scrollback.ts b/app/src/js/scrollback.ts index a934a77..de3607f 100644 --- a/app/src/js/scrollback.ts +++ b/app/src/js/scrollback.ts @@ -61,12 +61,51 @@ export function addOutput(id: string, output: CommandOutput) { autoScroll() } + +export function appendOutput(id: string, output: CommandOutput) { + const item = document.querySelector(`[data-id="${id}"].output`) + + if (!item) { + console.error(`output id ${id} not found`) + return + } + + const [format, content] = processOutput(output) + + if (format === "html") + item.innerHTML += content + else + item.textContent += content + + autoScroll() +} + +export function replaceOutput(id: string, output: CommandOutput) { + const item = document.querySelector(`[data-id="${id}"].output`) + + if (!item) { + console.error(`output id ${id} not found`) + return + } + + const [format, content] = processOutput(output) + + if (format === "html") + item.innerHTML = content + else + item.textContent = content + + autoScroll() +} + function processOutput(output: CommandOutput): ["html" | "text", string] { let content = "" let html = false if (typeof output === "string") { content = output + } else if (Array.isArray(output)) { + content = output.join(" ") } else if (output.html !== undefined) { html = true content = output.html diff --git a/app/src/js/shell.ts b/app/src/js/shell.ts index 8717bc0..b245654 100644 --- a/app/src/js/shell.ts +++ b/app/src/js/shell.ts @@ -1,8 +1,8 @@ //// // The shell runs on the server and processes input, returning output. -import type { Message, CommandResult } from "../shared/types.js" -import { addInput, setStatus, addOutput } from "./scrollback.js" +import type { Message, CommandResult, CommandOutput } from "../shared/types.js" +import { addInput, setStatus, addOutput, appendOutput, replaceOutput } from "./scrollback.js" import { send } from "./websocket.js" import { randomId } from "../shared/utils.js" import { addToHistory } from "./history.js" @@ -30,14 +30,23 @@ export function runCommand(input: string) { // message received from server export function handleMessage(msg: Message) { - if (msg.type === "output") { - handleOutput(msg) - } else if (msg.type === "commands") { - cacheCommands(msg.data as string[]) - } else if (msg.type === "error") { - console.error(msg.data) - } else { - console.error("unknown message type", msg) + switch (msg.type) { + case "output": + handleOutput(msg); break + case "commands": + cacheCommands(msg.data as string[]); break + case "error": + console.error(msg.data); break + case "stream:start": + handleStreamStart(msg); break + case "stream:end": + handleStreamEnd(msg); break + case "stream:append": + handleStreamAppend(msg); break + case "stream:replace": + handleStreamReplace(msg); break + default: + console.error("unknown message type", msg) } } @@ -46,3 +55,33 @@ function handleOutput(msg: Message) { setStatus(msg.id!, result.status) addOutput(msg.id!, result.output) } + +function handleStreamStart(msg: Message) { + const id = msg.id! + + const status = document.querySelector(`[data-id="${id}"].input .status`) + if (!status) return + + addOutput(id, msg.data as CommandOutput) + + status.classList.remove("yellow") + status.classList.add("purple") +} + +function handleStreamAppend(msg: Message) { + appendOutput(msg.id!, msg.data as CommandOutput) +} + +function handleStreamReplace(msg: Message) { + replaceOutput(msg.id!, msg.data as CommandOutput) +} + +function handleStreamEnd(msg: Message) { + const id = msg.id! + + const status = document.querySelector(`[data-id="${id}"].input .status`) + if (!status) return + + status.classList.remove("purple") + status.classList.remove("green") +} \ No newline at end of file diff --git a/app/src/js/websocket.ts b/app/src/js/websocket.ts index 2986bcb..3cd018e 100644 --- a/app/src/js/websocket.ts +++ b/app/src/js/websocket.ts @@ -13,7 +13,6 @@ export function startConnection() { url.protocol = url.protocol.replace('http', 'ws') ws = new WebSocket(url) - ws.onopen = () => console.log('WS connected') ws.onmessage = receive ws.onclose = () => setTimeout(startConnection, 1000) // simple retry ws.onerror = () => ws?.close() diff --git a/app/src/session.ts b/app/src/session.ts index 023f616..e6caa8a 100644 --- a/app/src/session.ts +++ b/app/src/session.ts @@ -7,6 +7,7 @@ export type Session = { taskId?: string sessionId?: string project?: string + ws?: any } // Ensure "ALS" lives between bun's hot reloads diff --git a/app/src/shared/types.ts b/app/src/shared/types.ts index 3ae2236..39443ce 100644 --- a/app/src/shared/types.ts +++ b/app/src/shared/types.ts @@ -2,12 +2,13 @@ export type Message = { session?: string id?: string type: MessageType - data: CommandResult | string | string[] + data?: CommandResult | CommandOutput } export type MessageType = "error" | "input" | "output" | "commands" | "save-file" + | "stream:start" | "stream:end" | "stream:append" | "stream:replace" -export type CommandOutput = string | { html: string } +export type CommandOutput = string | string[] | { html: string } export type CommandResult = { status: "ok" | "error" diff --git a/app/src/shell.ts b/app/src/shell.ts index 2b363bb..8cc4eab 100644 --- a/app/src/shell.ts +++ b/app/src/shell.ts @@ -11,16 +11,15 @@ import { ALS } from "./session" const sessions: Map = new Map() -export async function runCommand(sessionId: string, taskId: string, input: string): Promise { +export async function runCommand(sessionId: string, taskId: string, input: string, ws?: any): Promise { const [cmd = "", ...args] = input.split(" ") - if (!commandExists(cmd)) { + if (!commandExists(cmd)) return { status: "error", output: `${cmd} not found` } - } let status: "ok" | "error" = "ok" let output: CommandOutput = "" - const state = getState(sessionId, taskId) + const state = getState(sessionId, taskId, ws) try { [status, output] = await ALS.run(state, async () => await exec(cmd, args)) @@ -41,7 +40,7 @@ async function exec(cmd: string, args: string[]): Promise<["ok" | "error", Comma return processExecOutput(await module.default(...args)) } -function processExecOutput(output: string | any): ["ok" | "error", CommandOutput] { +export function processExecOutput(output: string | any): ["ok" | "error", CommandOutput] { if (typeof output === "string") { return ["ok", output] } else if (typeof output === "object") { @@ -52,18 +51,21 @@ function processExecOutput(output: string | any): ["ok" | "error", CommandOutput } else { return ["ok", output] } + } else if (output === undefined) { + return ["ok", ""] } else { return ["ok", String(output)] } } -function getState(sessionId: string, taskId: string): Session { +function getState(sessionId: string, taskId: string, ws?: any): Session { let state = sessions.get(sessionId) if (!state) { state = { sessionId: sessionId, project: "" } sessions.set(sessionId, state) } state.taskId = taskId + if (ws) state.ws = ws return state } diff --git a/app/src/stream.ts b/app/src/stream.ts new file mode 100644 index 0000000..e4db35c --- /dev/null +++ b/app/src/stream.ts @@ -0,0 +1,51 @@ +import { send as sendWs } from "./websocket" +import { getState } from "./session" +import { processExecOutput } from "./shell" +import type { Child } from "hono/jsx" +import type { CommandOutput, Message } from "./shared/types" + +type StreamFn = (output: Child) => Promise +type StreamFns = { replace: StreamFn, append: StreamFn } +type StreamParamFn = (fns: StreamFns) => Promise + +export async function stream(initOrFn: StreamParamFn | string | any, fn?: StreamParamFn) { + const state = getState() + if (!state) throw "stream() called outside runCommand()" + + let error = false + const taskId = state.taskId + const session = state.sessionId + + const send = (msg: Message) => { + sendWs(state.ws, { ...msg, id: taskId, session }) + } + + const append = async (output: string | any) => { + const [status, data] = processExecOutput(output) + if (status === "error") error = true + send({ type: "stream:append", data }) + } + + const replace = async (output: string | any) => { + const [status, data] = processExecOutput(output) + if (status === "error") error = true + send({ type: "stream:replace", data }) + } + + let init: CommandOutput = "" + let func + + if (typeof initOrFn === "function") { + func = initOrFn + } else { + init = processExecOutput(initOrFn)[1] + func = fn + } + + return new Promise(async resolve => { + send({ type: "stream:start", data: init }) + await func({ replace, append }) + send({ type: "stream:end", data: error ? "error" : "ok" }) + resolve() + }) +}