97 lines
3.1 KiB
TypeScript
97 lines
3.1 KiB
TypeScript
import { join } from 'path'
|
|
import { loadAppEnv } from '@because/toes/tools'
|
|
import type { CronJob } from './schedules'
|
|
import { getNextRun } from './scheduler'
|
|
|
|
const APPS_DIR = process.env.APPS_DIR!
|
|
const TOES_DIR = process.env.TOES_DIR!
|
|
const TOES_URL = process.env.TOES_URL!
|
|
const RUNNER = join(import.meta.dir, 'runner.ts')
|
|
|
|
function forwardLog(app: string, text: string, stream: 'stdout' | 'stderr' = 'stdout') {
|
|
fetch(`${TOES_URL}/api/apps/${app}/logs`, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({ text, stream }),
|
|
}).catch(() => {})
|
|
}
|
|
|
|
async function readStream(stream: ReadableStream<Uint8Array>, append: (text: string) => void) {
|
|
const reader = stream.getReader()
|
|
const decoder = new TextDecoder()
|
|
while (true) {
|
|
const { done, value } = await reader.read()
|
|
if (done) break
|
|
append(decoder.decode(value, { stream: true }))
|
|
}
|
|
}
|
|
|
|
export async function executeJob(job: CronJob, onUpdate: () => void): Promise<void> {
|
|
if (job.state === 'disabled') return
|
|
|
|
job.state = 'running'
|
|
job.lastRun = Date.now()
|
|
job.lastOutput = undefined
|
|
job.lastError = undefined
|
|
job.lastExitCode = undefined
|
|
job.lastDuration = undefined
|
|
onUpdate()
|
|
|
|
const cwd = join(APPS_DIR, job.app, 'current')
|
|
|
|
forwardLog(job.app, `[cron] Running ${job.name}`)
|
|
|
|
try {
|
|
const proc = Bun.spawn(['bun', 'run', RUNNER, job.file], {
|
|
cwd,
|
|
env: { ...process.env, ...loadAppEnv(job.app), DATA_DIR: join(TOES_DIR, job.app) },
|
|
stdout: 'pipe',
|
|
stderr: 'pipe',
|
|
})
|
|
|
|
// Stream output incrementally into job fields
|
|
await Promise.all([
|
|
readStream(proc.stdout as ReadableStream<Uint8Array>, text => {
|
|
job.lastOutput = (job.lastOutput || '') + text
|
|
for (const line of text.split('\n').filter(Boolean)) {
|
|
forwardLog(job.app, `[cron:${job.name}] ${line}`)
|
|
}
|
|
}),
|
|
readStream(proc.stderr as ReadableStream<Uint8Array>, text => {
|
|
job.lastError = (job.lastError || '') + text
|
|
for (const line of text.split('\n').filter(Boolean)) {
|
|
forwardLog(job.app, `[cron:${job.name}] ${line}`, 'stderr')
|
|
}
|
|
}),
|
|
])
|
|
|
|
const code = await proc.exited
|
|
|
|
job.lastDuration = Date.now() - job.lastRun
|
|
job.lastExitCode = code
|
|
if (!job.lastError && code !== 0) job.lastError = 'Non-zero exit'
|
|
if (code === 0) job.lastError = undefined
|
|
if (!job.lastOutput) job.lastOutput = undefined
|
|
job.state = 'idle'
|
|
job.nextRun = getNextRun(job.id)
|
|
|
|
// Log result
|
|
const status = code === 0 ? 'ok' : `failed (code=${code})`
|
|
const summary = `[cron] ${job.name} finished: ${status} duration=${job.lastDuration}ms`
|
|
console.log(summary)
|
|
forwardLog(job.app, summary, code === 0 ? 'stdout' : 'stderr')
|
|
|
|
if (job.lastOutput) console.log(job.lastOutput)
|
|
if (job.lastError) console.error(job.lastError)
|
|
} catch (e) {
|
|
job.lastDuration = Date.now() - job.lastRun
|
|
job.lastExitCode = 1
|
|
job.lastError = e instanceof Error ? e.message : String(e)
|
|
job.state = 'idle'
|
|
console.error(`[cron] ${job.id} failed:`, e)
|
|
forwardLog(job.app, `[cron] ${job.name} failed: ${job.lastError}`, 'stderr')
|
|
}
|
|
|
|
onUpdate()
|
|
}
|