toes/apps/cron/20260201-000000/lib/executor.ts

74 lines
2.2 KiB
TypeScript

import { join } from 'path'
import type { CronJob } from './schedules'
import { getNextRun } from './scheduler'
const APPS_DIR = process.env.APPS_DIR!
const TOES_DIR = process.env.TOES_DIR!
const RUNNER = join(import.meta.dir, 'runner.ts')
async function readStream(stream: ReadableStream<Uint8Array>, append: (text: string) => void) {
const reader = stream.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
append(decoder.decode(value, { stream: true }))
}
}
export async function executeJob(job: CronJob, onUpdate: () => void): Promise<void> {
if (job.state === 'disabled') return
job.state = 'running'
job.lastRun = Date.now()
job.lastOutput = undefined
job.lastError = undefined
job.lastExitCode = undefined
job.lastDuration = undefined
onUpdate()
const cwd = join(APPS_DIR, job.app, 'current')
try {
const proc = Bun.spawn(['bun', 'run', RUNNER, job.file], {
cwd,
env: { ...process.env, DATA_DIR: join(TOES_DIR, job.app) },
stdout: 'pipe',
stderr: 'pipe',
})
// Stream output incrementally into job fields
await Promise.all([
readStream(proc.stdout as ReadableStream<Uint8Array>, text => {
job.lastOutput = (job.lastOutput || '') + text
}),
readStream(proc.stderr as ReadableStream<Uint8Array>, text => {
job.lastError = (job.lastError || '') + text
}),
])
const code = await proc.exited
job.lastDuration = Date.now() - job.lastRun
job.lastExitCode = code
if (!job.lastError && code !== 0) job.lastError = 'Non-zero exit'
if (code === 0) job.lastError = undefined
if (!job.lastOutput) job.lastOutput = undefined
job.state = 'idle'
job.nextRun = getNextRun(job.id)
// Log result
console.log(`[cron] ${job.id} finished: code=${code} duration=${job.lastDuration}ms`)
if (job.lastOutput) console.log(job.lastOutput)
if (job.lastError) console.error(job.lastError)
} catch (e) {
job.lastDuration = Date.now() - job.lastRun
job.lastExitCode = 1
job.lastError = e instanceof Error ? e.message : String(e)
job.state = 'idle'
console.error(`[cron] ${job.id} failed:`, e)
}
onUpdate()
}