From 96d10c3df0ae69ce98a5bfa489428085579be8e3 Mon Sep 17 00:00:00 2001 From: Corey Johnson Date: Mon, 24 Nov 2025 09:09:15 -0800 Subject: [PATCH] ok --- scripts/cli.ts | 236 ++++++++++++++---------------------- src/agent/README.md | 35 +++--- src/agent/index.ts | 4 +- src/phone.ts | 27 +++-- src/sip.ts | 20 +-- src/test-operator.ts | 4 +- src/utils/emitter.ts | 42 +++++++ src/utils/log.ts | 2 +- src/utils/signal.ts | 96 --------------- src/utils/waiting-sounds.ts | 21 ++-- 10 files changed, 197 insertions(+), 290 deletions(-) create mode 100644 src/utils/emitter.ts delete mode 100644 src/utils/signal.ts diff --git a/scripts/cli.ts b/scripts/cli.ts index 8d70307..306ba5a 100644 --- a/scripts/cli.ts +++ b/scripts/cli.ts @@ -5,174 +5,126 @@ import {$} from "bun"; const SERVICES = { ap: "phone-ap", web: "phone-web", -}; + phone: "phone", +} as const; -const commands = { - status: "Show status of all services", - logs: "Show recent logs from all services (last 50 lines)", - tail: "Tail logs from all services in real-time", - restart: "Restart all services", - stop: "Stop all services", - start: "Start all services", - "ap-status": "Show status of AP service", - "ap-logs": "Show recent logs from AP service (last 50 lines)", - "ap-tail": "Tail logs from AP service in real-time", - "ap-restart": "Restart AP service", - "ap-stop": "Stop AP service", - "ap-start": "Start AP service", - "web-status": "Show status of web service", - "web-logs": "Show recent logs from web service (last 50 lines)", - "web-tail": "Tail logs from web service in real-time", - "web-restart": "Restart web service", - "web-stop": "Stop web service", - "web-start": "Start web service", - help: "Show this help message", -}; +const COMMANDS = { + status: "Show service status", + logs: "Show recent logs (last 50 lines)", + tail: "Tail logs in real-time", + restart: "Restart service (requires sudo)", + stop: "Stop service (requires sudo)", + start: "Start service (requires sudo)", +} as const; -const command = process.argv[2]; - -if (!command || command === "help") { +const showHelp = () => { console.log(` -Phone CLI - Service Management Tool +Phone CLI - Service Management -Usage: bun cli +Usage: cli SERVICE COMMAND [-v] -All Services: - status Show status of all services - logs Show recent logs from all services (last 50 lines) - tail Tail logs from all services in real-time - restart Restart all services - stop Stop all services - start Start all services +Services: + ap WiFi AP Monitor (phone-ap.service) + web Web Server (phone-web.service) + phone Phone Application (phone.service) -AP Service (phone-ap): - ap-status Show AP status - ap-logs Show AP logs (last 50 lines) - ap-tail Tail AP logs in real-time - ap-restart Restart AP service - ap-stop Stop AP service - ap-start Start AP service +Commands: + status Show service status + logs Show recent logs (last 50 lines) + tail Tail logs in real-time + restart Restart service (requires sudo) + stop Stop service (requires sudo) + start Start service (requires sudo) -Web Service (phone-web): - web-status Show web status - web-logs Show web logs (last 50 lines) - web-tail Tail web logs in real-time - web-restart Restart web service - web-stop Stop web service - web-start Start web service +Options: + -v Verbose mode - show actual systemd commands Examples: - bun cli status - bun cli ap-logs - bun cli web-tail - sudo bun cli ap-restart + cli ap status + cli web logs + cli phone tail + cli -v ap status + sudo cli ap restart `); +}; + +// Parse arguments +const args = process.argv.slice(2); + +// Check for help +if (args.length === 0 || args[0] === "help") { + showHelp(); process.exit(0); } -if (!Object.keys(commands).includes(command)) { - console.error(`āŒ Unknown command: ${command}`); - console.log(`Run 'bun cli.ts help' to see available commands`); +// Extract verbose flag and remaining args +const verbose = args.includes("-v"); +const [service, command] = args.filter(arg => arg !== "-v"); + +// Validate service +if (!service || !(service in SERVICES)) { + console.error(`āŒ Unknown service: ${service || "(missing)"}`); + console.log(`Available services: ${Object.keys(SERVICES).join(", ")}`); process.exit(1); } -console.log(`\nšŸ”§ Phone CLI - ${command}\n`); +// Validate command +if (!command || !(command in COMMANDS)) { + console.error(`āŒ Unknown command: ${command || "(missing)"}`); + console.log(`Available commands: ${Object.keys(COMMANDS).join(", ")}`); + process.exit(1); +} -// Parse service-specific commands -const match = command.match(/^(ap|web)-(.+)$/); -if (match) { - const [, prefix, action] = match; - const service = SERVICES[prefix as keyof typeof SERVICES]; +// Get systemd service name +const serviceName = SERVICES[service as keyof typeof SERVICES]; - switch (action) { - case "status": - console.log(`━━━ ${service}.service ━━━`); - await $`systemctl status ${service}.service --no-pager -l`.nothrow(); - break; +// Execute command +console.log(`\nšŸ”§ Phone CLI - ${service} ${command}\n`); - case "logs": - console.log(`šŸ“‹ Recent logs (last 50 lines):\n`); - await $`journalctl -u ${service}.service -n 50 --no-pager`.nothrow(); - break; - - case "tail": - console.log(`šŸ“” Tailing logs (Ctrl+C to stop)...\n`); - await $`journalctl -u ${service}.service -f --no-pager`.nothrow(); - break; - - case "restart": - console.log(`šŸ”„ Restarting ${service}.service...\n`); - await $`sudo systemctl restart ${service}.service`; - console.log(`āœ“ ${service}.service restarted!`); - break; - - case "stop": - console.log(`šŸ›‘ Stopping ${service}.service...\n`); - await $`sudo systemctl stop ${service}.service`; - console.log(`āœ“ ${service}.service stopped!`); - break; - - case "start": - console.log(`ā–¶ļø Starting ${service}.service...\n`); - await $`sudo systemctl start ${service}.service`; - console.log(`āœ“ ${service}.service started!`); - break; +const logCommand = (cmd: string) => { + if (verbose) { + console.log(`→ ${cmd}\n`); } -} else { - // All-services commands - const allServices = Object.values(SERVICES); +}; - switch (command) { - case "status": - for (const service of allServices) { - console.log(`━━━ ${service}.service ━━━`); - await $`systemctl status ${service}.service --no-pager -l`.nothrow(); - console.log(""); - } - break; +switch (command) { + case "status": + logCommand(`systemctl status ${serviceName}.service --no-pager -l`); + await $`systemctl status ${serviceName}.service --no-pager -l`.nothrow(); + break; - case "logs": - console.log("šŸ“‹ Recent logs (last 50 lines):\n"); - const serviceFlags = allServices.map(s => `-u ${s}.service`).join(" "); - await $`journalctl ${serviceFlags} -n 50 --no-pager`.nothrow(); - break; + case "logs": + console.log(`šŸ“‹ Recent logs (last 50 lines):\n`); + logCommand(`journalctl -u ${serviceName}.service -n 50 --no-pager`); + await $`journalctl -u ${serviceName}.service -n 50 --no-pager`.nothrow(); + break; - case "tail": - console.log("šŸ“” Tailing logs (Ctrl+C to stop)...\n"); - const tailFlags = allServices.map(s => `-u ${s}.service`).join(" "); - await $`journalctl ${tailFlags} -f --no-pager`.nothrow(); - break; + case "tail": + console.log(`šŸ“” Tailing logs (Ctrl+C to stop)...\n`); + logCommand(`journalctl -u ${serviceName}.service -f --no-pager`); + await $`journalctl -u ${serviceName}.service -f --no-pager`.nothrow(); + break; - case "restart": - console.log("šŸ”„ Restarting services...\n"); - for (const service of allServices) { - console.log(`Restarting ${service}.service...`); - await $`sudo systemctl restart ${service}.service`; - console.log(`āœ“ ${service}.service restarted`); - } - console.log("\nāœ“ All services restarted!"); - break; + case "restart": + console.log(`šŸ”„ Restarting ${serviceName}.service...\n`); + logCommand(`sudo systemctl restart ${serviceName}.service`); + await $`sudo systemctl restart ${serviceName}.service`; + console.log(`āœ“ ${serviceName}.service restarted!`); + break; - case "stop": - console.log("šŸ›‘ Stopping services...\n"); - for (const service of allServices) { - console.log(`Stopping ${service}.service...`); - await $`sudo systemctl stop ${service}.service`; - console.log(`āœ“ ${service}.service stopped`); - } - console.log("\nāœ“ All services stopped!"); - break; + case "stop": + console.log(`šŸ›‘ Stopping ${serviceName}.service...\n`); + logCommand(`sudo systemctl stop ${serviceName}.service`); + await $`sudo systemctl stop ${serviceName}.service`; + console.log(`āœ“ ${serviceName}.service stopped!`); + break; - case "start": - console.log("ā–¶ļø Starting services...\n"); - for (const service of allServices) { - console.log(`Starting ${service}.service...`); - await $`sudo systemctl start ${service}.service`; - console.log(`āœ“ ${service}.service started`); - } - console.log("\nāœ“ All services started!"); - break; - } + case "start": + console.log(`ā–¶ļø Starting ${serviceName}.service...\n`); + logCommand(`sudo systemctl start ${serviceName}.service`); + await $`sudo systemctl start ${serviceName}.service`; + console.log(`āœ“ ${serviceName}.service started!`); + break; } console.log(""); diff --git a/src/agent/README.md b/src/agent/README.md index 93a4020..f2514c3 100644 --- a/src/agent/README.md +++ b/src/agent/README.md @@ -1,12 +1,12 @@ # Agent -A clean, reusable wrapper for ElevenLabs conversational AI WebSocket protocol. Uses Signal-based events and provides simple tool registration. +A clean, reusable wrapper for ElevenLabs conversational AI WebSocket protocol. Uses events and provides simple tool registration. ## Basic Usage ```typescript -import { Agent } from './pi/agent' -import Buzz from './pi/buzz' +import { Agent } from "./pi/agent" +import Buzz from "./pi/buzz" const agent = new Agent({ agentId: process.env.ELEVEN_AGENT_ID!, @@ -14,27 +14,24 @@ const agent = new Agent({ tools: { search_web: async (args) => { return { results: [`Result for ${args.query}`] } - } - } + }, + }, }) // Set up event handlers const player = await Buzz.player() let playback = player.playStream() -agent.events.connect((event) => { - if (event.type === 'audio') { - const audioBuffer = Buffer.from(event.audioBase64, 'base64') +agent.events.on((event) => { + if (event.type === "audio") { + const audioBuffer = Buffer.from(event.audioBase64, "base64") if (!playback.isPlaying) playback = player.playStream() playback.write(audioBuffer) - } - else if (event.type === 'interruption') { + } else if (event.type === "interruption") { playback.stop() - } - else if (event.type === 'user_transcript') { + } else if (event.type === "user_transcript") { console.log(`User: ${event.transcript}`) - } - else if (event.type === 'agent_response') { + } else if (event.type === "agent_response") { console.log(`Agent: ${event.response}`) } }) @@ -68,7 +65,7 @@ for await (const chunk of recording.stream()) { if (rms > vadThreshold) { // Speech detected! Start conversation agent = new Agent({ agentId, apiKey, tools }) - agent.events.connect(eventHandler) + agent.events.on(eventHandler) await agent.start() // Send buffered audio @@ -112,7 +109,7 @@ new Agent({ ### Properties -- `agent.events: Signal` - Connect to receive all events +- `agent.events: Emitter` - Connect to receive all events - `agent.isConnected: boolean` - Current connection state - `agent.conversationId?: string` - Available after connected event @@ -121,11 +118,13 @@ new Agent({ All events are emitted through `agent.events`: ### Connection + - `{ type: 'connected', conversationId, audioFormat }` - `{ type: 'disconnected' }` - `{ type: 'error', error }` ### Conversation + - `{ type: 'user_transcript', transcript }` - `{ type: 'agent_response', response }` - `{ type: 'agent_response_correction', original, corrected }` @@ -134,11 +133,13 @@ All events are emitted through `agent.events`: - `{ type: 'interruption', eventId }` ### Tools + - `{ type: 'tool_call', name, args, callId }` - `{ type: 'tool_result', name, result, callId }` - `{ type: 'tool_error', name, error, callId }` ### Optional + - `{ type: 'vad_score', score }` - `{ type: 'ping', eventId, pingMs }` @@ -146,7 +147,7 @@ All events are emitted through `agent.events`: - **Generic**: Not tied to phone systems, works in any context - **Flexible audio**: You control when to send audio, Agent just handles WebSocket -- **Event-driven**: All communication through Signal events, no throws +- **Event-driven**: All communication through events, no throws - **Simple tools**: Just pass a function map to constructor - **Automatic buffering**: Sends buffered audio when connection opens - **Automatic chunking**: Handles 8000-byte chunking internally diff --git a/src/agent/index.ts b/src/agent/index.ts index 913f2a2..b081706 100644 --- a/src/agent/index.ts +++ b/src/agent/index.ts @@ -1,4 +1,4 @@ -import { Signal } from "../utils/signal" +import { Emitter } from "../utils/emitter" import type { AgentConfig, AgentEvent } from "./types" type AgentState = "disconnected" | "connecting" | "connected" @@ -11,7 +11,7 @@ export class Agent { #chunkBuffer = new Uint8Array(0) #chunkSize = 8000 - public readonly events = new Signal() + public readonly events = new Emitter() public conversationId?: string constructor(config: AgentConfig) { diff --git a/src/phone.ts b/src/phone.ts index a1f5ab3..4c54943 100644 --- a/src/phone.ts +++ b/src/phone.ts @@ -46,7 +46,7 @@ export const runPhone = async (agentId: string, agentKey: string) => { using rotaryInUse = gpio.input(22, { pull: "up", debounce: 3 }) using rotaryNumber = gpio.input(23, { pull: "up", debounce: 3 }) - await Buzz.setVolume(0.2) + await Buzz.setVolume(0.3) log(`šŸ“ž Phone is ${hook.value ? "off hook" : "on hook"}`) playStartRing(ringer) @@ -94,7 +94,7 @@ const startBaresip = async (phoneService: PhoneService, hook: GPIO.Input, ringer const baresipConfig = join(import.meta.dir, "..", "baresip") const baresip = new Baresip(["/usr/bin/baresip", "-v", "-f", baresipConfig]) - baresip.registrationSuccess.connect(async () => { + baresip.registrationSuccess.on(async () => { log("🐻 server connected") if (hook.value === 0) { phoneService.send({ type: "initialized" }) @@ -103,17 +103,17 @@ const startBaresip = async (phoneService: PhoneService, hook: GPIO.Input, ringer } }) - baresip.callReceived.connect(({ contact }) => { + baresip.callReceived.on(({ contact }) => { log(`🐻 incoming call from ${contact}`) phoneService.send({ type: "incoming-call", from: contact }) }) - baresip.callEstablished.connect(({ contact }) => { + baresip.callEstablished.on(({ contact }) => { log(`🐻 call established with ${contact}`) phoneService.send({ type: "answered" }) }) - baresip.hungUp.connect(() => { + baresip.hungUp.on(() => { log("🐻 call hung up") phoneService.send({ type: "remote-hang-up" }) }) @@ -123,7 +123,7 @@ const startBaresip = async (phoneService: PhoneService, hook: GPIO.Input, ringer phoneService.send({ type: "error", message: error.message }) }) - baresip.error.connect(async ({ message }) => { + baresip.error.on(async ({ message }) => { log.error("🐻 error:", message) phoneService.send({ type: "error", message }) for (let i = 0; i < 4; i++) { @@ -200,7 +200,7 @@ const startListening = (service: Service, agent: Agent) => let preConnectionBuffer: Uint8Array[] = [] - agent.events.connect(async (event) => { + agent.events.on(async (event) => { if (event.type === "disconnected") abortAgent.abort() }) @@ -250,7 +250,7 @@ const handleAgentEvents = ( ) => { const waitingIndicator = new WaitingSounds(player) - agent.events.connect(async (event) => { + agent.events.on(async (event) => { switch (event.type) { case "connected": log("šŸ¤– Connected to AI agent\n") @@ -271,9 +271,14 @@ const handleAgentEvents = ( break case "interruption": - log("šŸ¤– User interrupted") - streamPlayback?.stop() - streamPlayback = player.playStream() // Reset playback stream + if (waitingIndicator.isPlaying) { + log("šŸ¤– User interrupted but doing a tool lookup") + } else { + log("šŸ¤– User interrupted") + await waitingIndicator.stop() + streamPlayback?.stop() + streamPlayback = player.playStream() // Reset playback stream + } break case "tool_call": diff --git a/src/sip.ts b/src/sip.ts index 722a4fe..46c26de 100644 --- a/src/sip.ts +++ b/src/sip.ts @@ -1,15 +1,15 @@ import log from "./utils/log.ts" -import { Signal } from "./utils/signal.ts" +import { Emitter } from "./utils/emitter.ts" import { processStdout, processStderr } from "./utils/stdio.ts" export class Baresip { baresipArgs: string[] process?: Bun.PipedSubprocess - callEstablished = new Signal<{ contact: string }>() - callReceived = new Signal<{ contact: string }>() - hungUp = new Signal() - error = new Signal<{ message: string }>() - registrationSuccess = new Signal() + callEstablished = new Emitter<{ contact: string }>() + callReceived = new Emitter<{ contact: string }>() + hungUp = new Emitter() + error = new Emitter<{ message: string }>() + registrationSuccess = new Emitter() constructor(baresipArgs: string[]) { this.baresipArgs = baresipArgs @@ -48,10 +48,10 @@ export class Baresip { } disconnectAll() { - this.callEstablished.disconnect() - this.callReceived.disconnect() - this.hungUp.disconnect() - this.registrationSuccess.disconnect() + this.callEstablished.removeAllListeners() + this.callReceived.removeAllListeners() + this.hungUp.removeAllListeners() + this.registrationSuccess.removeAllListeners() } kill() { diff --git a/src/test-operator.ts b/src/test-operator.ts index 5a56b8a..4b2eb77 100755 --- a/src/test-operator.ts +++ b/src/test-operator.ts @@ -24,7 +24,7 @@ const runPhoneSystem = async (agentId: string, apiKey: string) => { const waitingIndicator = new WaitingSounds(player) // Set up agent event listeners - agent.events.connect(async (event) => { + agent.events.on(async (event) => { switch (event.type) { case "connected": console.log("āœ… Connected to AI agent\n") @@ -162,7 +162,7 @@ if (!apiKey) { if (!agentId) { console.error( - "āŒ Error: ELEVEN_AGENT_ID environELEVEN_AGENT_ID=agent_5601k4taw2cvfjzrz6snxpgeh7x8 ELEVEN_API_KEY=sk_0313740f112c5992cb62ed96c974ab19b5916f1ea172471fment variable is required" + "āŒ Error: ELEVEN_AGENT_ID environELEVEN_AGENT_ID=agent_5601k4taw2cvfjzrz6snxpgeh7x8 ELEVEN_API_KEY=sk_0313740f112c5992cb62ed96c974ab19b5916f1ea172471fment variable is required", ) console.error(" Create an agent at https://elevenlabs.io/app/conversational-ai") process.exit(1) diff --git a/src/utils/emitter.ts b/src/utils/emitter.ts new file mode 100644 index 0000000..3f91e76 --- /dev/null +++ b/src/utils/emitter.ts @@ -0,0 +1,42 @@ +/** + * How to use Emitter: + * + * Create an emitter: + * const chat = new Emitter<{ username: string, message: string }>() + * + * Listen to events: + * const off = chat.on((data) => { + * const {username, message} = data; + * console.log(`${username} said "${message}"`); + * }) + * + * Emit an event: + * chat.emit({ username: "Chad", message: "Hey everyone, how's it going?" }); + * + * Remove a specific listener: + * off(); // The off function is returned when you add a listener + * + * Remove all listeners: + * chat.removeAllListeners() + */ +export class Emitter { + private listeners: Array<(data: T) => void> = [] + + on(listener: (data: T) => void) { + this.listeners.push(listener) + + return () => { + this.listeners = this.listeners.filter((l) => l !== listener) + } + } + + emit(data: T) { + for (const listener of this.listeners) { + listener(data) + } + } + + removeAllListeners() { + this.listeners = [] + } +} diff --git a/src/utils/log.ts b/src/utils/log.ts index 33e1c00..d159ea0 100644 --- a/src/utils/log.ts +++ b/src/utils/log.ts @@ -1,4 +1,4 @@ -let showDebug = true +let showDebug = process.env.DEBUG ?? false let showInfo = true export function setLogLevel(level: "debug" | "info" | "error") { diff --git a/src/utils/signal.ts b/src/utils/signal.ts deleted file mode 100644 index e537562..0000000 --- a/src/utils/signal.ts +++ /dev/null @@ -1,96 +0,0 @@ -/** - * How to use a Signal: - * - * Create a signal: - * const chatSignal = new Signal<{ username: string, message: string }>() - * - * Connect to the signal: - * const disconnect = chatSignal.connect((data) => { - * const {username, message} = data; - * console.log(`${username} said "${message}"`); - * }) - * - * Emit a signal: - * chatSignal.emit({ username: "Chad", message: "Hey everyone, how's it going?" }); - * - * - * Disconnect a single listener: - * disconnect(); // The disconnect function is returned when you connect to a signal - * - * Disconnect all listeners: - * chatSignal.disconnect() - */ - -export class Signal { - private listeners: Array<(data: T) => void> = [] - - connect(listenerOrSignal: Signal | ((data: T) => void)) { - let listener: (data: T) => void - - // If it is a signal, forward the data to the signal - if (listenerOrSignal instanceof Signal) { - listener = (data: T) => listenerOrSignal.emit(data) - } else { - listener = listenerOrSignal - } - - this.listeners.push(listener) - - return () => { - this.listeners = this.listeners.filter((l) => l !== listener) - } - } - - emit(data: T) { - for (const listener of this.listeners) { - listener(data) - } - } - - disconnect() { - this.listeners = [] - } -} - -/** - * How to use Emitter: - * - * Create an emitter: - * const chat = new Emitter<{ username: string, message: string }>() - * - * Listen to events: - * const off = chat.on((data) => { - * const {username, message} = data; - * console.log(`${username} said "${message}"`); - * }) - * - * Emit an event: - * chat.emit({ username: "Chad", message: "Hey everyone, how's it going?" }); - * - * Remove a specific listener: - * off(); // The off function is returned when you add a listener - * - * Remove all listeners: - * chat.removeAllListeners() - */ -export class Emitter { - private listeners: Array<(data: T) => void> = [] - - on(listener: (data: T) => void) { - this.listeners.push(listener) - - return () => { - this.listeners = this.listeners.filter((l) => l !== listener) - } - } - - emit(data: T) { - for (const listener of this.listeners) { - listener(data) - } - } - - removeAllListeners() { - this.listeners = [] - } -} diff --git a/src/utils/waiting-sounds.ts b/src/utils/waiting-sounds.ts index 3b0c49a..28881a2 100644 --- a/src/utils/waiting-sounds.ts +++ b/src/utils/waiting-sounds.ts @@ -6,16 +6,22 @@ import { log } from "console" export class WaitingSounds { typingPlayback?: Buzz.Playback speakingPlayback?: Buzz.Playback + playing = false constructor(private player: Buzz.Player) {} async start(operatorStream: Buzz.StreamingPlayback) { - if (this.typingPlayback) return // Already playing + if (this.playing) return // Already playing + this.playing = true this.#startTypingSounds() this.#startSpeakingSounds(operatorStream) } + get isPlaying() { + return this.playing + } + async #startTypingSounds() { return new Promise(async (resolve) => { do { @@ -29,7 +35,7 @@ export class WaitingSounds { const typingSound = getSound(dir) this.typingPlayback = await this.player.play(typingSound) await this.typingPlayback.finished() - } while (this.typingPlayback) + } while (this.isPlaying) resolve() }) @@ -64,20 +70,17 @@ export class WaitingSounds { this.speakingPlayback = await this.player.play(speakingSound) playedSounds.add(speakingSound) await this.speakingPlayback.finished() - } while (this.typingPlayback) + } while (this.isPlaying) resolve() }) } async stop() { log(`šŸ›‘ Stopping waiting sounds. Has typingPlayback: ${!!this.typingPlayback}`) - if (!this.typingPlayback) return + if (!this.playing) return + this.playing = false - // Quicky undefine this to stop the loops - const typingPlayback = this.typingPlayback - this.typingPlayback = undefined - - await Promise.all([typingPlayback.stop(), this.speakingPlayback?.finished()]) + await Promise.all([this.typingPlayback?.stop(), this.speakingPlayback?.finished()]) log("šŸ›‘ Waiting sounds stopped") } }