226 lines
4.9 KiB
Bash
Executable File
226 lines
4.9 KiB
Bash
Executable File
#!/bin/bash
|
|
# Live per-channel mic level meter for the barepi capture path.
|
|
# Usage:
|
|
# ./tap-mics.sh
|
|
# ./tap-mics.sh --thsel-high
|
|
# ./tap-mics.sh --thsel-low
|
|
# ./tap-mics.sh --window-ms 125
|
|
# Press Ctrl-C to stop.
|
|
|
|
set -euo pipefail
|
|
|
|
RATE=48000
|
|
WINDOW_MS=125
|
|
THSEL=""
|
|
CAPTURE_PCM="${CAPTURE_PCM:-barepi_capture}"
|
|
|
|
usage() {
|
|
cat <<'EOF'
|
|
Usage:
|
|
./tap-mics.sh
|
|
./tap-mics.sh --thsel-high
|
|
./tap-mics.sh --thsel-low
|
|
./tap-mics.sh --window-ms 125
|
|
|
|
Environment:
|
|
CAPTURE_PCM=... Override the ALSA capture device.
|
|
EOF
|
|
}
|
|
|
|
set_thsel() {
|
|
local level="$1"
|
|
command -v pinctrl >/dev/null 2>&1 || return 0
|
|
if [ "$level" = "high" ]; then
|
|
sudo pinctrl set 8 op pn dh >/dev/null
|
|
else
|
|
sudo pinctrl set 8 op pn dl >/dev/null
|
|
fi
|
|
}
|
|
|
|
prepare_wake_pin() {
|
|
command -v pinctrl >/dev/null 2>&1 || return 0
|
|
sudo pinctrl set 13 ip pn >/dev/null 2>&1 || true
|
|
}
|
|
|
|
show_pins() {
|
|
command -v pinctrl >/dev/null 2>&1 || return 0
|
|
sudo pinctrl get 8,13 || true
|
|
}
|
|
|
|
while [ $# -gt 0 ]; do
|
|
case "$1" in
|
|
--thsel-high)
|
|
THSEL="high"
|
|
shift
|
|
;;
|
|
--thsel-low)
|
|
THSEL="low"
|
|
shift
|
|
;;
|
|
--window-ms)
|
|
WINDOW_MS="${2:-}"
|
|
shift 2
|
|
;;
|
|
-h|--help)
|
|
usage
|
|
exit 0
|
|
;;
|
|
*)
|
|
echo "Unknown argument: $1" >&2
|
|
usage >&2
|
|
exit 1
|
|
;;
|
|
esac
|
|
done
|
|
|
|
if ! [[ "$WINDOW_MS" =~ ^[0-9]+$ ]] || [ "$WINDOW_MS" -le 0 ]; then
|
|
echo "--window-ms must be a positive integer" >&2
|
|
exit 1
|
|
fi
|
|
|
|
if ! arecord -L 2>/dev/null | grep -qx 'barepi_capture'; then
|
|
CAPTURE_PCM="plughw:CARD=barepiaudio,DEV=0"
|
|
fi
|
|
|
|
if [ -n "$THSEL" ]; then
|
|
set_thsel "$THSEL"
|
|
fi
|
|
prepare_wake_pin
|
|
show_pins
|
|
|
|
echo "Live tap test from ${CAPTURE_PCM} (${RATE} Hz, window ${WINDOW_MS} ms). Press Ctrl-C to stop."
|
|
python3 - <<'PY' "$CAPTURE_PCM" "$RATE" "$WINDOW_MS"
|
|
import math
|
|
import signal
|
|
import struct
|
|
import subprocess
|
|
import sys
|
|
|
|
pcm = sys.argv[1]
|
|
rate = int(sys.argv[2])
|
|
window_ms = int(sys.argv[3])
|
|
channels = 2
|
|
sample_bytes = 4
|
|
frame_bytes = channels * sample_bytes
|
|
frames_per_chunk = max(1, rate * window_ms // 1000)
|
|
chunk_bytes = frames_per_chunk * frame_bytes
|
|
full_scale = float(2**31 - 1)
|
|
|
|
cmd = [
|
|
"arecord", "-q",
|
|
"-D", pcm,
|
|
"-t", "raw",
|
|
"-f", "S32_LE",
|
|
"-c", str(channels),
|
|
"-r", str(rate),
|
|
"-",
|
|
]
|
|
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
assert proc.stdout is not None
|
|
|
|
stopped = False
|
|
|
|
def handle_signal(signum, frame):
|
|
raise KeyboardInterrupt
|
|
|
|
signal.signal(signal.SIGINT, handle_signal)
|
|
signal.signal(signal.SIGTERM, handle_signal)
|
|
|
|
def dbfs(value: float) -> float:
|
|
if value <= 0:
|
|
return -186.6
|
|
return 20.0 * math.log10(value / full_scale)
|
|
|
|
def bar(db: float, width: int = 24, floor: float = -72.0) -> str:
|
|
if db <= floor:
|
|
filled = 0
|
|
elif db >= 0:
|
|
filled = width
|
|
else:
|
|
filled = int(round((db - floor) / (0 - floor) * width))
|
|
filled = max(0, min(width, filled))
|
|
return "#" * filled + "." * (width - filled)
|
|
|
|
def read_exact(n: int) -> bytes:
|
|
parts = []
|
|
got = 0
|
|
while got < n:
|
|
chunk = proc.stdout.read(n - got)
|
|
if not chunk:
|
|
raise EOFError
|
|
parts.append(chunk)
|
|
got += len(chunk)
|
|
return b"".join(parts)
|
|
|
|
max_l_peak = 0
|
|
max_r_peak = 0
|
|
last_len = 0
|
|
|
|
try:
|
|
while True:
|
|
data = read_exact(chunk_bytes)
|
|
l_sum = 0
|
|
r_sum = 0
|
|
l_peak = 0
|
|
r_peak = 0
|
|
frames = 0
|
|
|
|
for l, r in struct.iter_unpack("<ii", data):
|
|
frames += 1
|
|
l_abs = abs(l)
|
|
r_abs = abs(r)
|
|
if l_abs > l_peak:
|
|
l_peak = l_abs
|
|
if r_abs > r_peak:
|
|
r_peak = r_abs
|
|
l_sum += l * l
|
|
r_sum += r * r
|
|
|
|
l_rms = math.sqrt(l_sum / frames) if frames else 0.0
|
|
r_rms = math.sqrt(r_sum / frames) if frames else 0.0
|
|
max_l_peak = max(max_l_peak, l_peak)
|
|
max_r_peak = max(max_r_peak, r_peak)
|
|
|
|
l_rms_db = dbfs(l_rms)
|
|
r_rms_db = dbfs(r_rms)
|
|
l_peak_db = dbfs(l_peak)
|
|
r_peak_db = dbfs(r_peak)
|
|
|
|
line = (
|
|
f"L rms {l_rms_db:6.1f} peak {l_peak_db:6.1f} [{bar(l_peak_db)}] "
|
|
f"R rms {r_rms_db:6.1f} peak {r_peak_db:6.1f} [{bar(r_peak_db)}]"
|
|
)
|
|
pad = " " * max(0, last_len - len(line))
|
|
sys.stdout.write("\r" + line + pad)
|
|
sys.stdout.flush()
|
|
last_len = len(line)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except EOFError:
|
|
pass
|
|
finally:
|
|
if proc.poll() is None:
|
|
proc.terminate()
|
|
try:
|
|
proc.wait(timeout=1.0)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
proc.wait()
|
|
|
|
print()
|
|
print(
|
|
"Max peaks seen: "
|
|
f"L={dbfs(max_l_peak):.1f} dBFS "
|
|
f"R={dbfs(max_r_peak):.1f} dBFS"
|
|
)
|
|
if max_l_peak == 0 and max_r_peak > 0:
|
|
print("Only the right channel showed activity during this run.")
|
|
elif max_r_peak == 0 and max_l_peak > 0:
|
|
print("Only the left channel showed activity during this run.")
|
|
PY
|