Compare commits
3 Commits
a545c3e056
...
2c726c5bd2
| Author | SHA1 | Date | |
|---|---|---|---|
| 2c726c5bd2 | |||
| d6638263a7 | |||
| 97a6b952d5 |
14
app/nose/bin/counter.tsx
Normal file
14
app/nose/bin/counter.tsx
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
import { stream } from "@/stream"
|
||||
|
||||
export default async function () {
|
||||
let count = 1
|
||||
|
||||
return stream(<p>{count}</p>, async ({ replace }) => {
|
||||
for (let i = 1; i < 10; i++) {
|
||||
await Bun.sleep(250)
|
||||
|
||||
count++
|
||||
replace(<p>{count}</p>)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -20,7 +20,7 @@ export async function dispatchMessage(ws: any, msg: Message) {
|
|||
}
|
||||
|
||||
async function inputMessage(ws: any, msg: Message) {
|
||||
const result = await runCommand(msg.session || "", msg.id || "", msg.data as string)
|
||||
const result = await runCommand(msg.session || "", msg.id || "", msg.data as string, ws)
|
||||
send(ws, { id: msg.id, type: "output", data: result })
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
// input, output, etc
|
||||
|
||||
import { scrollback, $$ } from "./dom.js"
|
||||
import { randomId } from "../shared/utils.js"
|
||||
import type { CommandOutput } from "../shared/types.js"
|
||||
|
||||
type InputStatus = "waiting" | "streaming" | "ok" | "error"
|
||||
|
|
@ -48,7 +49,7 @@ export function setStatus(id: string, status: InputStatus) {
|
|||
export function addOutput(id: string, output: CommandOutput) {
|
||||
const item = $$("li")
|
||||
item.classList.add("output")
|
||||
item.dataset.id = id
|
||||
item.dataset.id = id || randomId()
|
||||
|
||||
const [format, content] = processOutput(output)
|
||||
|
||||
|
|
@ -61,12 +62,55 @@ export function addOutput(id: string, output: CommandOutput) {
|
|||
autoScroll()
|
||||
}
|
||||
|
||||
export function addErrorMessage(message: string) {
|
||||
addOutput("", { html: `<span class="red">${message}</span>` })
|
||||
}
|
||||
|
||||
|
||||
export function appendOutput(id: string, output: CommandOutput) {
|
||||
const item = document.querySelector(`[data-id="${id}"].output`)
|
||||
|
||||
if (!item) {
|
||||
console.error(`output id ${id} not found`)
|
||||
return
|
||||
}
|
||||
|
||||
const [format, content] = processOutput(output)
|
||||
|
||||
if (format === "html")
|
||||
item.innerHTML += content
|
||||
else
|
||||
item.textContent += content
|
||||
|
||||
autoScroll()
|
||||
}
|
||||
|
||||
export function replaceOutput(id: string, output: CommandOutput) {
|
||||
const item = document.querySelector(`[data-id="${id}"].output`)
|
||||
|
||||
if (!item) {
|
||||
console.error(`output id ${id} not found`)
|
||||
return
|
||||
}
|
||||
|
||||
const [format, content] = processOutput(output)
|
||||
|
||||
if (format === "html")
|
||||
item.innerHTML = content
|
||||
else
|
||||
item.textContent = content
|
||||
|
||||
autoScroll()
|
||||
}
|
||||
|
||||
function processOutput(output: CommandOutput): ["html" | "text", string] {
|
||||
let content = ""
|
||||
let html = false
|
||||
|
||||
if (typeof output === "string") {
|
||||
content = output
|
||||
} else if (Array.isArray(output)) {
|
||||
content = output.join(" ")
|
||||
} else if (output.html !== undefined) {
|
||||
html = true
|
||||
content = output.html
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
////
|
||||
// The shell runs on the server and processes input, returning output.
|
||||
|
||||
import type { Message, CommandResult } from "../shared/types.js"
|
||||
import { addInput, setStatus, addOutput } from "./scrollback.js"
|
||||
import type { Message, CommandResult, CommandOutput } from "../shared/types.js"
|
||||
import { addInput, setStatus, addOutput, appendOutput, replaceOutput } from "./scrollback.js"
|
||||
import { send } from "./websocket.js"
|
||||
import { randomId } from "../shared/utils.js"
|
||||
import { addToHistory } from "./history.js"
|
||||
|
|
@ -16,7 +16,7 @@ export function runCommand(input: string) {
|
|||
addToHistory(input)
|
||||
addInput(id, input)
|
||||
|
||||
const [cmd = "", ...args] = input.split(" ")
|
||||
const [cmd = "", ..._args] = input.split(" ")
|
||||
|
||||
if (browserCommands[cmd]) {
|
||||
const result = browserCommands[cmd]()
|
||||
|
|
@ -30,14 +30,23 @@ export function runCommand(input: string) {
|
|||
|
||||
// message received from server
|
||||
export function handleMessage(msg: Message) {
|
||||
if (msg.type === "output") {
|
||||
handleOutput(msg)
|
||||
} else if (msg.type === "commands") {
|
||||
cacheCommands(msg.data as string[])
|
||||
} else if (msg.type === "error") {
|
||||
console.error(msg.data)
|
||||
} else {
|
||||
console.error("unknown message type", msg)
|
||||
switch (msg.type) {
|
||||
case "output":
|
||||
handleOutput(msg); break
|
||||
case "commands":
|
||||
cacheCommands(msg.data as string[]); break
|
||||
case "error":
|
||||
console.error(msg.data); break
|
||||
case "stream:start":
|
||||
handleStreamStart(msg); break
|
||||
case "stream:end":
|
||||
handleStreamEnd(msg); break
|
||||
case "stream:append":
|
||||
handleStreamAppend(msg); break
|
||||
case "stream:replace":
|
||||
handleStreamReplace(msg); break
|
||||
default:
|
||||
console.error("unknown message type", msg)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -46,3 +55,26 @@ function handleOutput(msg: Message) {
|
|||
setStatus(msg.id!, result.status)
|
||||
addOutput(msg.id!, result.output)
|
||||
}
|
||||
|
||||
function handleStreamStart(msg: Message) {
|
||||
const id = msg.id!
|
||||
|
||||
const status = document.querySelector(`[data-id="${id}"].input .status`)
|
||||
if (!status) return
|
||||
|
||||
addOutput(id, msg.data as CommandOutput)
|
||||
|
||||
status.classList.remove("yellow")
|
||||
status.classList.add("purple")
|
||||
}
|
||||
|
||||
function handleStreamAppend(msg: Message) {
|
||||
appendOutput(msg.id!, msg.data as CommandOutput)
|
||||
}
|
||||
|
||||
function handleStreamReplace(msg: Message) {
|
||||
replaceOutput(msg.id!, msg.data as CommandOutput)
|
||||
}
|
||||
|
||||
function handleStreamEnd(_msg: Message) {
|
||||
}
|
||||
|
|
@ -4,6 +4,10 @@
|
|||
import type { Message } from "../shared/types.js"
|
||||
import { sessionID } from "./session.js"
|
||||
import { handleMessage } from "./shell.js"
|
||||
import { addErrorMessage } from "./scrollback.js"
|
||||
|
||||
const MAX_RETRIES = 5
|
||||
let retries = 0
|
||||
|
||||
let ws: WebSocket | null = null
|
||||
|
||||
|
|
@ -13,9 +17,8 @@ export function startConnection() {
|
|||
url.protocol = url.protocol.replace('http', 'ws')
|
||||
ws = new WebSocket(url)
|
||||
|
||||
ws.onopen = () => console.log('WS connected')
|
||||
ws.onmessage = receive
|
||||
ws.onclose = () => setTimeout(startConnection, 1000) // simple retry
|
||||
ws.onclose = retryConnection
|
||||
ws.onerror = () => ws?.close()
|
||||
}
|
||||
|
||||
|
|
@ -37,3 +40,13 @@ export function close() {
|
|||
ws?.close(1000, 'bye')
|
||||
}
|
||||
|
||||
function retryConnection() {
|
||||
if (retries >= MAX_RETRIES) {
|
||||
addErrorMessage(`!! Failed to reconnect ${retries} times. Server is down.`)
|
||||
if (ws) ws.onclose = () => { }
|
||||
return
|
||||
}
|
||||
retries++
|
||||
addErrorMessage(`!! Connection lost. Retrying...`)
|
||||
setTimeout(startConnection, 1000)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ export type Session = {
|
|||
taskId?: string
|
||||
sessionId?: string
|
||||
project?: string
|
||||
ws?: any
|
||||
}
|
||||
|
||||
// Ensure "ALS" lives between bun's hot reloads
|
||||
|
|
|
|||
|
|
@ -2,12 +2,13 @@ export type Message = {
|
|||
session?: string
|
||||
id?: string
|
||||
type: MessageType
|
||||
data: CommandResult | string | string[]
|
||||
data?: CommandResult | CommandOutput
|
||||
}
|
||||
|
||||
export type MessageType = "error" | "input" | "output" | "commands" | "save-file"
|
||||
| "stream:start" | "stream:end" | "stream:append" | "stream:replace"
|
||||
|
||||
export type CommandOutput = string | { html: string }
|
||||
export type CommandOutput = string | string[] | { html: string }
|
||||
|
||||
export type CommandResult = {
|
||||
status: "ok" | "error"
|
||||
|
|
|
|||
|
|
@ -11,16 +11,15 @@ import { ALS } from "./session"
|
|||
|
||||
const sessions: Map<string, Session> = new Map()
|
||||
|
||||
export async function runCommand(sessionId: string, taskId: string, input: string): Promise<CommandResult> {
|
||||
export async function runCommand(sessionId: string, taskId: string, input: string, ws?: any): Promise<CommandResult> {
|
||||
const [cmd = "", ...args] = input.split(" ")
|
||||
|
||||
if (!commandExists(cmd)) {
|
||||
if (!commandExists(cmd))
|
||||
return { status: "error", output: `${cmd} not found` }
|
||||
}
|
||||
|
||||
let status: "ok" | "error" = "ok"
|
||||
let output: CommandOutput = ""
|
||||
const state = getState(sessionId, taskId)
|
||||
const state = getState(sessionId, taskId, ws)
|
||||
|
||||
try {
|
||||
[status, output] = await ALS.run(state, async () => await exec(cmd, args))
|
||||
|
|
@ -41,7 +40,7 @@ async function exec(cmd: string, args: string[]): Promise<["ok" | "error", Comma
|
|||
return processExecOutput(await module.default(...args))
|
||||
}
|
||||
|
||||
function processExecOutput(output: string | any): ["ok" | "error", CommandOutput] {
|
||||
export function processExecOutput(output: string | any): ["ok" | "error", CommandOutput] {
|
||||
if (typeof output === "string") {
|
||||
return ["ok", output]
|
||||
} else if (typeof output === "object") {
|
||||
|
|
@ -52,18 +51,21 @@ function processExecOutput(output: string | any): ["ok" | "error", CommandOutput
|
|||
} else {
|
||||
return ["ok", output]
|
||||
}
|
||||
} else if (output === undefined) {
|
||||
return ["ok", ""]
|
||||
} else {
|
||||
return ["ok", String(output)]
|
||||
}
|
||||
}
|
||||
|
||||
function getState(sessionId: string, taskId: string): Session {
|
||||
function getState(sessionId: string, taskId: string, ws?: any): Session {
|
||||
let state = sessions.get(sessionId)
|
||||
if (!state) {
|
||||
state = { sessionId: sessionId, project: "" }
|
||||
sessions.set(sessionId, state)
|
||||
}
|
||||
state.taskId = taskId
|
||||
if (ws) state.ws = ws
|
||||
return state
|
||||
}
|
||||
|
||||
|
|
|
|||
51
app/src/stream.ts
Normal file
51
app/src/stream.ts
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
import { send as sendWs } from "./websocket"
|
||||
import { getState } from "./session"
|
||||
import { processExecOutput } from "./shell"
|
||||
import type { Child } from "hono/jsx"
|
||||
import type { CommandOutput, Message } from "./shared/types"
|
||||
|
||||
type StreamFn = (output: Child) => Promise<void>
|
||||
type StreamFns = { replace: StreamFn, append: StreamFn }
|
||||
type StreamParamFn = (fns: StreamFns) => Promise<void>
|
||||
|
||||
export async function stream(initOrFn: StreamParamFn | string | any, fn?: StreamParamFn) {
|
||||
const state = getState()
|
||||
if (!state) throw "stream() called outside runCommand()"
|
||||
|
||||
let error = false
|
||||
const taskId = state.taskId
|
||||
const session = state.sessionId
|
||||
|
||||
const send = (msg: Message) => {
|
||||
sendWs(state.ws, { ...msg, id: taskId, session })
|
||||
}
|
||||
|
||||
const append = async (output: string | any) => {
|
||||
const [status, data] = processExecOutput(output)
|
||||
if (status === "error") error = true
|
||||
send({ type: "stream:append", data })
|
||||
}
|
||||
|
||||
const replace = async (output: string | any) => {
|
||||
const [status, data] = processExecOutput(output)
|
||||
if (status === "error") error = true
|
||||
send({ type: "stream:replace", data })
|
||||
}
|
||||
|
||||
let init: CommandOutput = ""
|
||||
let func
|
||||
|
||||
if (typeof initOrFn === "function") {
|
||||
func = initOrFn
|
||||
} else {
|
||||
init = processExecOutput(initOrFn)[1]
|
||||
func = fn
|
||||
}
|
||||
|
||||
return new Promise<void>(async resolve => {
|
||||
send({ type: "stream:start", data: init })
|
||||
await func({ replace, append })
|
||||
send({ type: "stream:end", data: error ? "error" : "ok" })
|
||||
resolve()
|
||||
})
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user