import { join } from 'path' import { loadAppEnv } from '@because/toes/tools' import type { CronJob } from './schedules' import { getNextRun } from './scheduler' const APPS_DIR = process.env.APPS_DIR! const TOES_DIR = process.env.TOES_DIR! const RUNNER = join(import.meta.dir, 'runner.ts') async function readStream(stream: ReadableStream, append: (text: string) => void) { const reader = stream.getReader() const decoder = new TextDecoder() while (true) { const { done, value } = await reader.read() if (done) break append(decoder.decode(value, { stream: true })) } } export async function executeJob(job: CronJob, onUpdate: () => void): Promise { if (job.state === 'disabled') return job.state = 'running' job.lastRun = Date.now() job.lastOutput = undefined job.lastError = undefined job.lastExitCode = undefined job.lastDuration = undefined onUpdate() const cwd = join(APPS_DIR, job.app, 'current') try { const proc = Bun.spawn(['bun', 'run', RUNNER, job.file], { cwd, env: { ...process.env, ...loadAppEnv(job.app), DATA_DIR: join(TOES_DIR, job.app) }, stdout: 'pipe', stderr: 'pipe', }) // Stream output incrementally into job fields await Promise.all([ readStream(proc.stdout as ReadableStream, text => { job.lastOutput = (job.lastOutput || '') + text }), readStream(proc.stderr as ReadableStream, text => { job.lastError = (job.lastError || '') + text }), ]) const code = await proc.exited job.lastDuration = Date.now() - job.lastRun job.lastExitCode = code if (!job.lastError && code !== 0) job.lastError = 'Non-zero exit' if (code === 0) job.lastError = undefined if (!job.lastOutput) job.lastOutput = undefined job.state = 'idle' job.nextRun = getNextRun(job.id) // Log result console.log(`[cron] ${job.id} finished: code=${code} duration=${job.lastDuration}ms`) if (job.lastOutput) console.log(job.lastOutput) if (job.lastError) console.error(job.lastError) } catch (e) { job.lastDuration = Date.now() - job.lastRun job.lastExitCode = 1 job.lastError = e instanceof Error ? e.message : String(e) job.state = 'idle' console.error(`[cron] ${job.id} failed:`, e) } onUpdate() }