52 lines
1.5 KiB
TypeScript
52 lines
1.5 KiB
TypeScript
import { send as sendWs } from "./websocket"
|
|
import { sessionGet } from "./session"
|
|
import { processExecOutput } from "./shell"
|
|
import type { Child } from "hono/jsx"
|
|
import type { CommandOutput, Message } from "./shared/types"
|
|
|
|
type StreamFn = (output: Child) => Promise<void>
|
|
type StreamFns = { replace: StreamFn, append: StreamFn }
|
|
type StreamParamFn = (fns: StreamFns) => Promise<void>
|
|
|
|
export async function stream(initOrFn: StreamParamFn | string | any, fn?: StreamParamFn) {
|
|
const state = sessionGet()
|
|
if (!state) throw "stream() called outside runCommand()"
|
|
|
|
let error = false
|
|
const taskId = state.taskId
|
|
const session = state.sessionId
|
|
|
|
const send = (msg: Message) => {
|
|
sendWs(state.ws, { ...msg, id: taskId, session })
|
|
}
|
|
|
|
const append = async (output: string | any) => {
|
|
const [status, data] = processExecOutput(output)
|
|
if (status === "error") error = true
|
|
send({ type: "stream:append", data })
|
|
}
|
|
|
|
const replace = async (output: string | any) => {
|
|
const [status, data] = processExecOutput(output)
|
|
if (status === "error") error = true
|
|
send({ type: "stream:replace", data })
|
|
}
|
|
|
|
let init: CommandOutput = ""
|
|
let func
|
|
|
|
if (typeof initOrFn === "function") {
|
|
func = initOrFn
|
|
} else {
|
|
init = processExecOutput(initOrFn)[1]
|
|
func = fn
|
|
}
|
|
|
|
return new Promise<void>(async resolve => {
|
|
send({ type: "stream:start", data: init })
|
|
await func({ replace, append })
|
|
send({ type: "stream:end", data: error ? "error" : "ok" })
|
|
resolve()
|
|
})
|
|
}
|