Move from polling to worker thread #3

Merged
probablycorey merged 7 commits from worker-test into main 2025-11-19 21:21:05 +00:00
8 changed files with 293 additions and 153 deletions
Showing only changes of commit 1527ce7d13 - Show all commits

1
baresip/accounts Normal file
View File

@ -0,0 +1 @@
<sip:yellow@probablycorey.sip.twilio.com;transport=tls>;auth_pass=zgm-kwx2bug5hwf3YGF;unregister_on_exit=yes;regint=300

71
baresip/config Normal file
View File

@ -0,0 +1,71 @@
#
# baresip configuration
#
#------------------------------------------------------------------------------
# Core
poll_method epoll # poll, select, epoll ..
ring_aufile none
# Call
call_local_timeout 120
call_max_calls 4
# Audio
audio_player alsa,default
audio_source alsa,default
audio_alert none
audio_alert_enable no
audio_level no
ausrc_format s16 # s16, float, ..
auplay_format s16 # s16, float, ..
auenc_format s16 # s16, float, ..
audec_format s16 # s16, float, ..
audio_buffer 20-160 # ms
# AVT - Audio/Video Transport
rtp_tos 184
rtcp_mux no
jitter_buffer_delay 5-10 # frames
rtp_stats no
#------------------------------------------------------------------------------
# Modules
module_path /usr/lib/baresip/modules
# UI Modules
#module stdio.so
# Audio codec Modules (in order)
module g711.so
# Audio driver Modules
module alsa.so
# Media NAT modules
module stun.so
module turn.so
module ice.so
module httpd.so
#------------------------------------------------------------------------------
# Temporary Modules (loaded then unloaded)
module_tmp uuid.so
module_tmp account.so
#------------------------------------------------------------------------------
# Application Modules
module_app contact.so
module_app debug_cmd.so
module_app menu.so
http_listen 0.0.0.0:8000 # httpd - HTTP Serve

View File

@ -6,6 +6,7 @@
"dependencies": {
"hono": "^4.10.4",
"openai": "^6.9.0",
"robot3": "^1.2.0",
},
"devDependencies": {
"@types/bun": "latest",
@ -30,6 +31,8 @@
"openai": ["openai@6.9.0", "", { "peerDependencies": { "ws": "^8.18.0", "zod": "^3.25 || ^4.0" }, "optionalPeers": ["ws", "zod"], "bin": { "openai": "bin/cli" } }, "sha512-n2sJRYmM+xfJ0l3OfH8eNnIyv3nQY7L08gZQu3dw6wSdfPtKAk92L83M2NIP5SS8Cl/bsBBG3yKzEOjkx0O+7A=="],
"robot3": ["robot3@1.2.0", "", {}, "sha512-Xin8KHqCKrD9Rqk1ZzZQYjsb6S9DRggcfwBqnVPeM3DLtNCJLxWWTrPJDYm3E+ZiTO7H3VMdgyPSkIbuYnYP2Q=="],
"typescript": ["typescript@5.9.3", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw=="],
"undici-types": ["undici-types@7.16.0", "", {}, "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw=="],

View File

@ -14,7 +14,8 @@
},
"dependencies": {
"hono": "^4.10.4",
"openai": "^6.9.0"
"openai": "^6.9.0",
"robot3": "^1.2.0"
},
"prettier": {
"semi": false,

View File

@ -1,12 +1,10 @@
import { d, reduce, createMachine, state, transition, interpret, guard } from "robot3"
import { Baresip } from "./sip"
import { log } from "./log"
import { gpio } from "./gpio"
import { log } from "./utils/log"
import { sleep } from "bun"
import { ToneGenerator } from "./tone"
import { ring } from "./utils"
import { pins } from "./pins"
import { processStderr, processStdout } from "./stdio"
import { processStderr, processStdout } from "./utils/stdio"
import Buzz from "./buzz"
import { join } from "path"
type CancelableTask = () => void
@ -21,150 +19,19 @@ type PhoneContext = {
cancelAgent?: CancelableTask
}
const incomingCallRing = (): CancelableTask => {
let abortController = new AbortController()
const playRingtone = async () => {
while (!abortController.signal.aborted) {
await ring(2000, abortController.signal)
await sleep(4000)
}
}
playRingtone().catch((error) => log.error("Ringer error:", error))
return () => abortController.abort()
}
const handleError = (ctx: PhoneContext, event: { type: "error"; message?: string }) => {
ctx.lastError = event.message
log.error(`Phone error: ${event.message}`)
return ctx
}
const incomingCall = (ctx: PhoneContext, event: { type: "incoming_call"; from?: string }) => {
ctx.peer = event.from
ctx.cancelRinger = incomingCallRing()
log.info(`Incoming call from ${event.from}`)
return ctx
}
const stopRinger = (ctx: PhoneContext) => {
ctx.cancelRinger?.()
ctx.cancelRinger = undefined
return ctx
}
const playDialTone = (ctx: PhoneContext) => {
const tone = new ToneGenerator()
tone.loopTone([350, 440])
ctx.cancelDialTone = () => {
tone.stop()
}
return ctx
}
const playOutgoingTone = () => {
const tone = new ToneGenerator()
let canceled = false
const play = async () => {
while (!canceled) {
await tone.playTone([440, 480], 2000)
await sleep(4000)
}
}
play().catch((error) => log.error("Outgoing tone error:", error))
return () => {
tone.stop()
canceled = true
}
}
const dialStart = (ctx: PhoneContext) => {
ctx.numberDialed = 0
ctx = stopDialTone(ctx)
return ctx
}
const makeCall = (ctx: PhoneContext) => {
log.info(`Dialing number: ${ctx.numberDialed}`)
if (ctx.numberDialed === 1) {
ctx.baresip.dial("+13476229543")
} else if (ctx.numberDialed === 2) {
ctx.baresip.dial("+18109643563")
} else {
const playTone = async () => {
const tone = new ToneGenerator()
await tone.playTone([900], 200)
await tone.playTone([1350], 200)
await tone.playTone([1750], 200)
}
playTone().catch((error) => log.error("Error playing tone:", error))
}
return ctx
}
const makeAgentCall = (ctx: PhoneContext) => {
log.info(`Calling agent`)
ctx.cancelAgent = ctx.startAgent()
return ctx
}
const callAgentGuard = (ctx: PhoneContext) => {
return ctx.numberDialed === 10
}
const callAnswered = (ctx: PhoneContext) => {
ctx.baresip.accept()
ctx.cancelDialTone?.()
ctx.cancelDialTone = undefined
ctx.cancelRinger?.()
ctx.cancelRinger = undefined
return ctx
}
const stopCall = (ctx: PhoneContext) => {
ctx.baresip.hangUp()
return ctx
}
const stopAgent = (ctx: PhoneContext) => {
log.info("🛑 Stopping agent")
ctx.cancelAgent?.()
ctx.cancelAgent = undefined
return ctx
}
const stopDialTone = (ctx: PhoneContext) => {
ctx.cancelDialTone?.()
ctx.cancelDialTone = undefined
return ctx
}
const digitIncrement = (ctx: PhoneContext) => {
ctx.numberDialed += 1
return ctx
export const pins = {
ringer: 17,
hook: 27,
rotaryInUse: 22,
rotaryNumber: 23,
}
export const startPhone = async () => {
Bun.spawn({
cmd: "amixer -c 0 set PCM 20%".split(" "),
})
await Buzz.setVolume(0.4)
const baresipConfig = join(import.meta.dir, "..", "baresip")
const baresip = new Baresip(["/usr/bin/baresip", "-v", "-f", baresipConfig])
const baresip = new Baresip(["/usr/bin/baresip", "-v", "-f", "/home/corey/code/tone/baresip"])
baresip.registrationSuccess.connect(async () => {
log.info("🐻 server connected")
const result = await gpio.get(pins.hook)
@ -338,3 +205,141 @@ export const startPhone = async () => {
process.on("SIGTERM", cleanup)
process.on("exit", cleanup)
}
const incomingCallRing = (): CancelableTask => {
let abortController = new AbortController()
const playRingtone = async () => {
while (!abortController.signal.aborted) {
await ring(2000, abortController.signal)
await sleep(4000)
}
}
playRingtone().catch((error) => log.error("Ringer error:", error))
return () => abortController.abort()
}
const handleError = (ctx: PhoneContext, event: { type: "error"; message?: string }) => {
ctx.lastError = event.message
log.error(`Phone error: ${event.message}`)
return ctx
}
const incomingCall = (ctx: PhoneContext, event: { type: "incoming_call"; from?: string }) => {
ctx.peer = event.from
ctx.cancelRinger = incomingCallRing()
log.info(`Incoming call from ${event.from}`)
return ctx
}
const stopRinger = (ctx: PhoneContext) => {
ctx.cancelRinger?.()
ctx.cancelRinger = undefined
return ctx
}
const playDialTone = (ctx: PhoneContext) => {
const tone = new ToneGenerator()
tone.loopTone([350, 440])
ctx.cancelDialTone = () => {
tone.stop()
}
return ctx
}
const playOutgoingTone = () => {
const tone = new ToneGenerator()
let canceled = false
const play = async () => {
while (!canceled) {
await tone.playTone([440, 480], 2000)
await sleep(4000)
}
}
play().catch((error) => log.error("Outgoing tone error:", error))
return () => {
tone.stop()
canceled = true
}
}
const dialStart = (ctx: PhoneContext) => {
ctx.numberDialed = 0
ctx = stopDialTone(ctx)
return ctx
}
const makeCall = (ctx: PhoneContext) => {
log.info(`Dialing number: ${ctx.numberDialed}`)
if (ctx.numberDialed === 1) {
ctx.baresip.dial("+13476229543")
} else if (ctx.numberDialed === 2) {
ctx.baresip.dial("+18109643563")
} else {
const playTone = async () => {
const tone = new ToneGenerator()
await tone.playTone([900], 200)
await tone.playTone([1350], 200)
await tone.playTone([1750], 200)
}
playTone().catch((error) => log.error("Error playing tone:", error))
}
return ctx
}
const makeAgentCall = (ctx: PhoneContext) => {
log.info(`Calling agent`)
ctx.cancelAgent = ctx.startAgent()
return ctx
}
const callAgentGuard = (ctx: PhoneContext) => {
return ctx.numberDialed === 10
}
const callAnswered = (ctx: PhoneContext) => {
ctx.baresip.accept()
ctx.cancelDialTone?.()
ctx.cancelDialTone = undefined
ctx.cancelRinger?.()
ctx.cancelRinger = undefined
return ctx
}
const stopCall = (ctx: PhoneContext) => {
ctx.baresip.hangUp()
return ctx
}
const stopAgent = (ctx: PhoneContext) => {
log.info("🛑 Stopping agent")
ctx.cancelAgent?.()
ctx.cancelAgent = undefined
return ctx
}
const stopDialTone = (ctx: PhoneContext) => {
ctx.cancelDialTone?.()
ctx.cancelDialTone = undefined
return ctx
}
const digitIncrement = (ctx: PhoneContext) => {
ctx.numberDialed += 1
return ctx
}

View File

@ -1,6 +1,6 @@
import { log } from "./log.ts"
import { Signal } from "./signal.ts"
import { processStdout, processStderr } from "./stdio.ts"
import { log } from "./utils/log.ts"
import { Signal } from "./utils/signal.ts"
import { processStdout, processStderr } from "./utils/stdio.ts"
export class Baresip {
baresipArgs: string[]
@ -13,9 +13,6 @@ export class Baresip {
constructor(baresipArgs: string[]) {
this.baresipArgs = baresipArgs
process.on("SIGINT", () => this.kill())
process.on("SIGTERM", () => this.kill())
}
async connect() {

21
src/utils/log.ts Normal file
View File

@ -0,0 +1,21 @@
let showDebug = true
let showInfo = true
let showError = true
export function setLogLevel(level: "debug" | "info" | "error" | "none") {
showDebug = level === "debug"
showInfo = level === "debug" || level === "info"
showError = level !== "none"
}
export const log = {
debug: (...args: any[]) => {
if (showDebug) console.debug("DEBUG: ", ...args)
},
info: (...args: any[]) => {
if (showInfo) console.log("INFO: ", ...args)
},
error: (...args: any[]) => {
if (showError) console.error("ERROR: ", ...args)
},
}

41
src/utils/stdio.ts Normal file
View File

@ -0,0 +1,41 @@
import { log } from "./log.ts"
export const LineSplitter = () => {
let buffer = ""
return new TransformStream({
transform(chunk, controller) {
buffer += chunk
const parts = buffer.split(/\n/)
const lines = parts.slice(0, -1)
buffer = parts.at(-1) || ""
for (const line of lines) {
controller.enqueue(line)
}
},
flush(controller) {
if (buffer.length > 0) {
controller.enqueue(buffer)
}
},
})
}
export async function processStdout(
process: Bun.ReadableSubprocess,
onLine: (line: string) => void
) {
for await (const line of process.stdout
.pipeThrough(new TextDecoderStream())
.pipeThrough(LineSplitter())) {
onLine(line)
}
}
export async function processStderr(process: Bun.ReadableSubprocess, prefix: string = "") {
for await (const line of process.stderr
.pipeThrough(new TextDecoderStream())
.pipeThrough(LineSplitter())) {
log.error(`${prefix}${line}`)
}
}