import { join } from 'path' import { loadAppEnv } from '@because/toes/tools' import type { CronJob } from './schedules' import { getNextRun } from './scheduler' const APPS_DIR = process.env.APPS_DIR! const TOES_DIR = process.env.TOES_DIR! const TOES_URL = process.env.TOES_URL! const RUNNER = join(import.meta.dir, 'runner.ts') function forwardLog(app: string, text: string, stream: 'stdout' | 'stderr' = 'stdout') { fetch(`${TOES_URL}/api/apps/${app}/logs`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ text, stream }), }).catch(() => {}) } async function readStream(stream: ReadableStream, append: (text: string) => void) { const reader = stream.getReader() const decoder = new TextDecoder() while (true) { const { done, value } = await reader.read() if (done) break append(decoder.decode(value, { stream: true })) } } export async function executeJob(job: CronJob, onUpdate: () => void): Promise { if (job.state === 'disabled') return job.state = 'running' job.lastRun = Date.now() job.lastOutput = undefined job.lastError = undefined job.lastExitCode = undefined job.lastDuration = undefined onUpdate() const cwd = join(APPS_DIR, job.app, 'current') forwardLog(job.app, `[cron] Running ${job.name}`) try { const proc = Bun.spawn(['bun', 'run', RUNNER, job.file], { cwd, env: { ...process.env, ...loadAppEnv(job.app), DATA_DIR: join(TOES_DIR, job.app) }, stdout: 'pipe', stderr: 'pipe', }) // Stream output incrementally into job fields await Promise.all([ readStream(proc.stdout as ReadableStream, text => { job.lastOutput = (job.lastOutput || '') + text for (const line of text.split('\n').filter(Boolean)) { forwardLog(job.app, `[cron:${job.name}] ${line}`) } }), readStream(proc.stderr as ReadableStream, text => { job.lastError = (job.lastError || '') + text for (const line of text.split('\n').filter(Boolean)) { forwardLog(job.app, `[cron:${job.name}] ${line}`, 'stderr') } }), ]) const code = await proc.exited job.lastDuration = Date.now() - job.lastRun job.lastExitCode = code if (!job.lastError && code !== 0) job.lastError = 'Non-zero exit' if (code === 0) job.lastError = undefined if (!job.lastOutput) job.lastOutput = undefined job.state = 'idle' job.nextRun = getNextRun(job.id) // Log result const status = code === 0 ? 'ok' : `failed (code=${code})` const summary = `[cron] ${job.name} finished: ${status} duration=${job.lastDuration}ms` console.log(summary) forwardLog(job.app, summary, code === 0 ? 'stdout' : 'stderr') if (job.lastOutput) console.log(job.lastOutput) if (job.lastError) console.error(job.lastError) } catch (e) { job.lastDuration = Date.now() - job.lastRun job.lastExitCode = 1 job.lastError = e instanceof Error ? e.message : String(e) job.state = 'idle' console.error(`[cron] ${job.id} failed:`, e) forwardLog(job.app, `[cron] ${job.name} failed: ${job.lastError}`, 'stderr') } onUpdate() }