nose-pluto/src/stream.ts
2025-10-01 15:28:16 -07:00

52 lines
1.5 KiB
TypeScript

import { send as sendWs } from "./websocket"
import { sessionGet } from "./session"
import { processExecOutput } from "./shell"
import type { Child } from "hono/jsx"
import type { CommandOutput, Message } from "./shared/types"
type StreamFn = (output: Child) => Promise<void>
type StreamFns = { replace: StreamFn, append: StreamFn }
type StreamParamFn = (fns: StreamFns) => Promise<void>
export async function stream(initOrFn: StreamParamFn | string | any, fn?: StreamParamFn) {
const state = sessionGet()
if (!state) throw "stream() called outside runCommand()"
let error = false
const taskId = state.taskId
const session = state.sessionId
const send = (msg: Message) => {
sendWs(state.ws, { ...msg, id: taskId, session })
}
const append = async (output: string | any) => {
const { status, output: data } = processExecOutput(output)
if (status === "error") error = true
send({ type: "stream:append", data })
}
const replace = async (output: string | any) => {
const { status, output: data } = processExecOutput(output)
if (status === "error") error = true
send({ type: "stream:replace", data })
}
let init: CommandOutput = ""
let func
if (typeof initOrFn === "function") {
func = initOrFn
} else {
init = processExecOutput(initOrFn).output
func = fn
}
return new Promise<void>(async resolve => {
send({ type: "stream:start", data: init })
await func({ replace, append })
send({ type: "stream:end", data: error ? "error" : "ok" })
resolve()
})
}