Add modular async pipeline server and ESP32 mDNS fallback

Pipeline: async voice pipeline replacing monolithic threaded server.
ASR, LLM, and TTS are independent pluggable services. ASR calls
external parakeet-asr-server, LLM uses any OpenAI-compatible
endpoint, TTS uses ElevenLabs with pluggable backend interface.

Firmware: add mDNS hostname resolution as fallback when multicast
discovery doesn't work. Resolves configured server_hostname via
MDNS.queryHost() on boot, falls back to multicast if resolution fails.

Also adds test_client.py that emulates an ESP32 device for testing
without hardware (TCP server, Opus decode, mic streaming).
This commit is contained in:
justLV 2026-02-07 15:04:12 -08:00
parent 7c531c90df
commit b3538493a6
16 changed files with 1065 additions and 1 deletions

2
.gitignore vendored
View file

@ -20,6 +20,8 @@ recording.wav
# Local development
server/local.py
pipeline/config.yaml
.venv/
# Claude
.claude/

View file

@ -1,5 +1,6 @@
#include <WiFi.h>
#include <WiFiUdp.h>
#include <ESPmDNS.h>
#include <driver/i2s.h>
#include <Adafruit_NeoPixel.h>
#include <Preferences.h>
@ -25,7 +26,7 @@
Preferences preferences;
// Define default values
#define DEFAULT_SERVER_HOSTNAME "default.server.com"
#define DEFAULT_SERVER_HOSTNAME "default.server.com" // Set via serial config: `c` then `server justins-mac-mini.local`
#define DEFAULT_MIC_TIMEOUT 30000
#define DEFAULT_SPEAKER_VOLUME 14
@ -238,6 +239,35 @@ void setup()
Serial.println("Starting TCP server");
tcpServer.begin();
// mDNS: register ourselves and try to resolve server hostname
MDNS.begin(desired_hostname);
if (server_hostname != DEFAULT_SERVER_HOSTNAME)
{
Serial.printf("Resolving server hostname: %s\n", server_hostname.c_str());
// Strip ".local" suffix if present — MDNS.queryHost expects bare hostname
String queryHost = server_hostname;
if (queryHost.endsWith(".local"))
{
queryHost = queryHost.substring(0, queryHost.length() - 6);
}
for (int i = 0; i < 10; i++)
{
IPAddress resolved = MDNS.queryHost(queryHost.c_str());
if (resolved != INADDR_NONE)
{
serverIP = resolved;
Serial.printf("Resolved %s -> %s\n", server_hostname.c_str(), serverIP.toString().c_str());
break;
}
delay(200);
}
if (serverIP == IPAddress(0, 0, 0, 0))
{
Serial.println("mDNS resolution failed, falling back to multicast discovery");
}
}
Serial.println("Sending multicast packet to announce presence");
udp.beginPacket(IPAddress(239, 0, 0, 1), 12345);
String mcast_string = String(hostname) + " " + String(GIT_HASH);

82
pipeline/README.md Normal file
View file

@ -0,0 +1,82 @@
# Pipeline Server
Async voice pipeline that connects ESP32 onju-voice devices to ASR, LLM, and TTS services.
```
ESP32 (mic) ──UDP/μ-law──▶ Pipeline ──HTTP──▶ ASR Service
├──▶ LLM (OpenAI-compatible)
├──▶ TTS (ElevenLabs)
ESP32 (speaker) ◀──TCP/Opus──┘
```
## Prerequisites
**ASR Service** — [parakeet-asr-server](https://github.com/justLV/parakeet-asr-server) running on port 8100.
**LLM** — Any OpenAI-compatible server. Examples:
```bash
# Local (mlx_lm)
mlx_lm.server --model mlx-community/gemma-3-4b-it-qat-4bit --port 8080
# Local (Ollama)
ollama serve # default port 11434
# Hosted — just set base_url and api_key in config.yaml
```
**TTS** — ElevenLabs API key (add to `config.yaml`).
## Setup
```bash
cd pipeline
uv venv && source .venv/bin/activate
uv pip install -r requirements.txt
```
## Configuration
```bash
cp config.yaml.example config.yaml
# Edit config.yaml with your API keys and preferences
```
## Running
Start the three services, then the pipeline:
```bash
# Terminal 1 — ASR
uvicorn main:app --port 8100
# Terminal 2 — LLM
mlx_lm.server --model mlx-community/gemma-3-4b-it-qat-4bit --port 8080
# Terminal 3 — Pipeline
python -m pipeline.main
```
## Test Client
A Python script that emulates an ESP32 device (TCP server, Opus decoding, mic streaming):
```bash
# From repo root
python test_client.py # localhost
python test_client.py 192.168.1.50 # remote server
python test_client.py --no-mic # playback only
```
## Config Reference
| Section | Key | Description |
|---------|-----|-------------|
| `asr.url` | ASR service endpoint | Default: `http://localhost:8100` |
| `llm.base_url` | OpenAI-compatible API base | Ollama, mlx_lm, OpenRouter, OpenAI |
| `llm.model` | Model name | Passed to chat completions API |
| `tts.backend` | TTS provider | Currently: `elevenlabs` |
| `vad.*` | Voice activity detection | Tune thresholds for sensitivity |
| `network.*` | Ports | UDP 3000 (mic), TCP 3001 (speaker), multicast 239.0.0.1:12345 |

0
pipeline/__init__.py Normal file
View file

73
pipeline/audio.py Normal file
View file

@ -0,0 +1,73 @@
import io
import struct
import numpy as np
import opuslib
from scipy.io.wavfile import write as wav_write
# u-law decompression table (ITU-T G.711)
ULAW_TABLE = np.array([
-32124, -31100, -30076, -29052, -28028, -27004, -25980, -24956,
-23932, -22908, -21884, -20860, -19836, -18812, -17788, -16764,
-15996, -15484, -14972, -14460, -13948, -13436, -12924, -12412,
-11900, -11388, -10876, -10364, -9852, -9340, -8828, -8316,
-7932, -7676, -7420, -7164, -6908, -6652, -6396, -6140,
-5884, -5628, -5372, -5116, -4860, -4604, -4348, -4092,
-3900, -3772, -3644, -3516, -3388, -3260, -3132, -3004,
-2876, -2748, -2620, -2492, -2364, -2236, -2108, -1980,
-1884, -1820, -1756, -1692, -1628, -1564, -1500, -1436,
-1372, -1308, -1244, -1180, -1116, -1052, -988, -924,
-876, -844, -812, -780, -748, -716, -684, -652,
-620, -588, -556, -524, -492, -460, -428, -396,
-372, -356, -340, -324, -308, -292, -276, -260,
-244, -228, -212, -196, -180, -164, -148, -132,
-120, -112, -104, -96, -88, -80, -72, -64,
-56, -48, -40, -32, -24, -16, -8, 0,
32124, 31100, 30076, 29052, 28028, 27004, 25980, 24956,
23932, 22908, 21884, 20860, 19836, 18812, 17788, 16764,
15996, 15484, 14972, 14460, 13948, 13436, 12924, 12412,
11900, 11388, 10876, 10364, 9852, 9340, 8828, 8316,
7932, 7676, 7420, 7164, 6908, 6652, 6396, 6140,
5884, 5628, 5372, 5116, 4860, 4604, 4348, 4092,
3900, 3772, 3644, 3516, 3388, 3260, 3132, 3004,
2876, 2748, 2620, 2492, 2364, 2236, 2108, 1980,
1884, 1820, 1756, 1692, 1628, 1564, 1500, 1436,
1372, 1308, 1244, 1180, 1116, 1052, 988, 924,
876, 844, 812, 780, 748, 716, 684, 652,
620, 588, 556, 524, 492, 460, 428, 396,
372, 356, 340, 324, 308, 292, 276, 260,
244, 228, 212, 196, 180, 164, 148, 132,
120, 112, 104, 96, 88, 80, 72, 64,
56, 48, 40, 32, 24, 16, 8, 0,
], dtype=np.int16)
def decode_ulaw(data: bytes) -> np.ndarray:
indices = np.frombuffer(data, dtype=np.uint8)
return ULAW_TABLE[indices]
def pcm_to_wav(samples: np.ndarray, rate: int = 16000) -> bytes:
buf = io.BytesIO()
wav_write(buf, rate, samples.astype(np.int16))
return buf.getvalue()
def opus_encode(pcm_data: bytes, sample_rate: int = 16000, frame_size: int = 320) -> list[bytes]:
encoder = opuslib.Encoder(sample_rate, 1, opuslib.APPLICATION_VOIP)
frame_bytes = frame_size * 2 # 16-bit mono
frames = []
for i in range(0, len(pcm_data), frame_bytes):
chunk = pcm_data[i:i + frame_bytes]
if len(chunk) < frame_bytes:
chunk += b'\x00' * (frame_bytes - len(chunk))
frames.append(encoder.encode(chunk, frame_size))
return frames
def opus_frames_to_tcp_payload(opus_frames: list[bytes]) -> bytes:
parts = []
for frame in opus_frames:
parts.append(struct.pack('>H', len(frame)))
parts.append(frame)
return b''.join(parts)

View file

@ -0,0 +1,50 @@
asr:
url: "http://localhost:8100"
llm:
base_url: "http://localhost:8080/v1" # mlx_lm.server (or Ollama, OpenRouter, OpenAI, etc.)
api_key: "none" # set if using a hosted API
model: "mlx-community/gemma-3-4b-it-qat-4bit"
max_messages: 20
max_tokens: 300
system_prompt: "You are a helpful voice assistant. Keep responses concise (under 2 sentences)."
tts:
backend: "elevenlabs"
default_voice: "Rachel"
elevenlabs:
api_key: "" # your ElevenLabs API key
default_voice: "Rachel"
voices:
Rachel: "21m00Tcm4TlvDq8ikWAM" # add your voice IDs here
vad:
aggressiveness: 3
start_ratio: 0.35
silence_ratio: 0.2
silence_time: 1.5
pre_buffer_s: 1.0
window_s: 0.8
network:
udp_port: 3000
tcp_port: 3001
multicast_group: "239.0.0.1"
multicast_port: 12345
audio:
sample_rate: 16000
chunk_size: 480 # 30ms at 16kHz
opus_frame_size: 320 # 20ms at 16kHz
device:
default_volume: 14
default_mic_timeout: 60
led_fade: 6
led_power: 35
led_update_period: 0.2
persist_file: "devices.json"
greeting_wav: "data/hello_imhere.wav"
logging:
level: "INFO"

91
pipeline/device.py Normal file
View file

@ -0,0 +1,91 @@
import json
import logging
import os
import time
from pipeline.vad import VAD
log = logging.getLogger(__name__)
class Device:
def __init__(self, hostname: str, ip: str, config: dict, messages: list | None = None, voice: str | None = None):
self.hostname = hostname
self.ip = ip
self.config = config
self.voice = voice or config["tts"].get("default_voice", "Rachel")
self.messages = messages or [{"role": "system", "content": config["llm"]["system_prompt"]}]
self.vad = VAD(config)
self.last_response: str | None = None
self.led_power = 0
self.led_update_time = 0.0
def prune_messages(self):
max_msgs = self.config["llm"]["max_messages"]
while len(self.messages) > max_msgs:
self.messages.pop(1) # keep system prompt at [0]
def to_dict(self) -> dict:
return {
"hostname": self.hostname,
"ip": self.ip,
"messages": self.messages,
"voice": self.voice,
}
@classmethod
def from_dict(cls, data: dict, config: dict) -> "Device":
return cls(
data["hostname"],
data["ip"],
config,
messages=data.get("messages"),
voice=data.get("voice"),
)
def __repr__(self):
return f"<Device {self.hostname} {self.ip} [{len(self.messages)-1} msgs]>"
class DeviceManager:
def __init__(self, config: dict):
self.config = config
self.devices: dict[str, Device] = {}
self.persist_path = config["device"].get("persist_file", "devices.json")
self._load()
def create_device(self, hostname: str, ip: str) -> Device:
device = self.devices.get(hostname)
if device is None:
device = Device(hostname, ip, self.config)
self.devices[hostname] = device
log.info(f"New device: {hostname} ({ip})")
elif device.ip != ip:
device.ip = ip
log.info(f"Updated {hostname} IP to {ip}")
else:
log.info(f"Device {hostname} reconnected ({ip})")
return device
def get_by_ip(self, ip: str) -> Device | None:
for d in self.devices.values():
if d.ip == ip:
return d
return None
def save(self):
data = {k: v.to_dict() for k, v in self.devices.items()}
with open(self.persist_path, "w") as f:
json.dump(data, f, indent=2)
log.info(f"Saved {len(self.devices)} devices to {self.persist_path}")
def _load(self):
if not os.path.exists(self.persist_path):
return
try:
with open(self.persist_path) as f:
data = json.load(f)
self.devices = {k: Device.from_dict(v, self.config) for k, v in data.items()}
log.info(f"Loaded {len(self.devices)} devices from {self.persist_path}")
except Exception as e:
log.warning(f"Failed to load {self.persist_path}: {e}")

189
pipeline/main.py Normal file
View file

@ -0,0 +1,189 @@
import asyncio
import atexit
import logging
import os
import socket
import struct
import sys
import time
import numpy as np
import yaml
from pipeline.audio import decode_ulaw, opus_encode, opus_frames_to_tcp_payload, pcm_to_wav
from pipeline.device import Device, DeviceManager
from pipeline.protocol import send_audio, send_led_blink, send_stop_listening
from pipeline.services import asr, llm, tts
from pipeline.vad import VAD
log = logging.getLogger(__name__)
def load_config(path: str = None) -> dict:
if path is None:
path = os.path.join(os.path.dirname(__file__), "config.yaml")
with open(path) as f:
return yaml.safe_load(f)
async def udp_listener(config: dict, manager: DeviceManager, utterance_queue: asyncio.Queue):
"""Receive u-law audio from ESP32 devices, run VAD, queue complete utterances."""
udp_port = config["network"]["udp_port"]
chunk_bytes = config["audio"]["chunk_size"] # u-law: 1 byte per sample
loop = asyncio.get_event_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("0.0.0.0", udp_port))
sock.setblocking(False)
log.info(f"UDP listening on :{udp_port}")
tcp_port = config["network"]["tcp_port"]
dev_cfg = config["device"]
while True:
data, addr = await loop.sock_recvfrom(sock, chunk_bytes * 2)
device = manager.get_by_ip(addr[0])
if device is None:
continue
pcm = decode_ulaw(data)
# LED visualization
is_speech = device.vad.is_speech_now
if is_speech:
device.led_power = min(255, device.led_power + dev_cfg["led_power"])
now = time.time()
if now - device.led_update_time > dev_cfg["led_update_period"]:
device.led_update_time = now
if device.led_power > 0:
asyncio.create_task(
send_led_blink(device.ip, tcp_port, device.led_power, fade=dev_cfg["led_fade"])
)
device.led_power = 0
# VAD
utterance = device.vad.process_frame(pcm)
if utterance is not None:
log.info(f"Utterance from {device.hostname} ({len(utterance)/config['audio']['sample_rate']:.1f}s)")
await utterance_queue.put((device, utterance))
async def multicast_listener(config: dict, manager: DeviceManager):
"""Listen for ESP32 device announcements and send greeting audio."""
mcast_group = config["network"]["multicast_group"]
mcast_port = config["network"]["multicast_port"]
tcp_port = config["network"]["tcp_port"]
dev_cfg = config["device"]
loop = asyncio.get_event_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", mcast_port))
group = socket.inet_aton(mcast_group)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, group + socket.inet_aton("0.0.0.0"))
sock.setblocking(False)
log.info(f"Multicast listening on {mcast_group}:{mcast_port}")
while True:
data, addr = await loop.sock_recvfrom(sock, 1024)
msg = data.decode("utf-8")
hostname = msg.split()[0]
log.info(f"Device announced: {hostname} from {addr[0]}")
device = manager.create_device(hostname, addr[0])
# Send greeting WAV as Opus
greeting_path = dev_cfg.get("greeting_wav")
if greeting_path and os.path.exists(greeting_path):
try:
from pydub import AudioSegment
audio = AudioSegment.from_file(greeting_path).set_channels(1).set_frame_rate(16000).set_sample_width(2)
pcm_data = audio.raw_data
frames = opus_encode(pcm_data, config["audio"]["sample_rate"], config["audio"]["opus_frame_size"])
payload = opus_frames_to_tcp_payload(frames)
await send_audio(device.ip, tcp_port, payload,
mic_timeout=dev_cfg["default_mic_timeout"],
volume=dev_cfg["default_volume"],
fade=dev_cfg["led_fade"])
except Exception as e:
log.error(f"Failed to send greeting to {hostname}: {e}")
async def process_utterances(config: dict, manager: DeviceManager, utterance_queue: asyncio.Queue, llm_client):
"""Process complete utterances: ASR -> LLM -> TTS -> Opus -> TCP."""
tcp_port = config["network"]["tcp_port"]
dev_cfg = config["device"]
no_speech_threshold = 0.45
while True:
device, audio_int16 = await utterance_queue.get()
try:
# Stop mic while processing
await send_stop_listening(device.ip, tcp_port)
# ASR
pcm_bytes = audio_int16.astype(np.int16).tobytes()
asr_result = await asr.transcribe(pcm_bytes, config)
text = asr_result.get("text", "").strip()
nsp = asr_result.get("no_speech_prob", 1.0)
if not text or nsp > no_speech_threshold:
log.debug(f"Ignoring non-speech from {device.hostname} (nsp={nsp:.2f})")
continue
log.info(f"[{device.hostname}] User: {text}")
# LLM
device.messages.append({"role": "user", "content": text})
response_text = await llm.chat(llm_client, device, config)
device.last_response = response_text
device.prune_messages()
log.info(f"[{device.hostname}] Assistant: {response_text}")
# TTS
pcm_response = await tts.synthesize(response_text, device.voice, config)
# Opus encode and send
frames = opus_encode(pcm_response, config["audio"]["sample_rate"], config["audio"]["opus_frame_size"])
payload = opus_frames_to_tcp_payload(frames)
await send_audio(device.ip, tcp_port, payload,
mic_timeout=dev_cfg["default_mic_timeout"],
volume=dev_cfg["default_volume"],
fade=dev_cfg["led_fade"])
except Exception as e:
log.error(f"Error processing utterance from {device.hostname}: {e}", exc_info=True)
utterance_queue.task_done()
async def main(config_path: str = None):
config = load_config(config_path)
log_level = getattr(logging, config.get("logging", {}).get("level", "INFO"))
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
datefmt="%H:%M:%S",
)
manager = DeviceManager(config)
atexit.register(manager.save)
llm_client = llm.make_client(config)
utterance_queue = asyncio.Queue()
log.info("Starting pipeline server")
await asyncio.gather(
udp_listener(config, manager, utterance_queue),
multicast_listener(config, manager),
process_utterances(config, manager, utterance_queue, llm_client),
)
if __name__ == "__main__":
config_path = sys.argv[1] if len(sys.argv) > 1 else None
asyncio.run(main(config_path))

55
pipeline/protocol.py Normal file
View file

@ -0,0 +1,55 @@
import asyncio
import logging
import struct
log = logging.getLogger(__name__)
async def send_tcp(ip: str, port: int, header: bytes, data: bytes | None = None, timeout: float = 60):
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(ip, port), timeout=5
)
writer.write(header)
if data:
writer.write(data)
await writer.drain()
writer.close()
await writer.wait_closed()
except asyncio.TimeoutError:
log.error(f"TCP timeout to {ip}:{port}")
except Exception as e:
log.error(f"TCP error to {ip}:{port}: {e}")
async def send_audio(ip: str, port: int, opus_payload: bytes, mic_timeout: int = 60, volume: int = 14, fade: int = 6):
# header[0] 0xAA for audio
# header[1:2] mic timeout in seconds (big-endian)
# header[3] volume
# header[4] fade rate
# header[5] compression type (2 = Opus)
header = bytes([
0xAA,
(mic_timeout >> 8) & 0xFF,
mic_timeout & 0xFF,
volume,
fade,
2, # Opus
])
await send_tcp(ip, port, header, opus_payload)
async def send_led_blink(ip: str, port: int, intensity: int, r: int = 255, g: int = 255, b: int = 255, fade: int = 6):
# header[0] 0xCC for LED blink
# header[1] starting intensity
# header[2:4] RGB
# header[5] fade rate
header = bytes([0xCC, intensity, r, g, b, fade])
await send_tcp(ip, port, header, timeout=0.1)
async def send_stop_listening(ip: str, port: int):
# header[0] 0xDD for mic timeout
# header[1:2] timeout = 0 (stop)
header = bytes([0xDD, 0, 0, 0, 0, 0])
await send_tcp(ip, port, header, timeout=0.2)

View file

@ -0,0 +1,8 @@
httpx
numpy
openai
opuslib
pydub
PyYAML
scipy
webrtcvad

View file

28
pipeline/services/asr.py Normal file
View file

@ -0,0 +1,28 @@
import logging
import httpx
import numpy as np
from pipeline.audio import pcm_to_wav
log = logging.getLogger(__name__)
async def transcribe(pcm_int16_bytes: bytes, config: dict) -> dict:
"""Send PCM audio to the ASR service and return {"text": ..., "no_speech_prob": ...}."""
wav_bytes = pcm_to_wav(
np.frombuffer(pcm_int16_bytes, dtype=np.int16),
rate=config["audio"]["sample_rate"],
)
url = config["asr"]["url"].rstrip("/") + "/transcribe"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
url,
files={"audio": ("audio.wav", wav_bytes, "audio/wav")},
)
resp.raise_for_status()
data = resp.json()
log.debug(f"ASR: \"{data['text']}\" ({data.get('transcribe_time_s', '?')}s, nsp={data.get('no_speech_prob', '?')})")
return data

29
pipeline/services/llm.py Normal file
View file

@ -0,0 +1,29 @@
import logging
from openai import AsyncOpenAI
log = logging.getLogger(__name__)
def make_client(config: dict) -> AsyncOpenAI:
cfg = config["llm"]
return AsyncOpenAI(
base_url=cfg["base_url"],
api_key=cfg.get("api_key", "none"),
)
async def chat(client: AsyncOpenAI, device, config: dict) -> str:
"""Send conversation to LLM, append assistant reply to device.messages, return text."""
cfg = config["llm"]
response = await client.chat.completions.create(
model=cfg["model"],
messages=device.messages,
max_tokens=cfg.get("max_tokens", 300),
)
text = response.choices[0].message.content or ""
device.messages.append({"role": "assistant", "content": text})
log.info(f"LLM: {text}")
return text

38
pipeline/services/tts.py Normal file
View file

@ -0,0 +1,38 @@
import io
import logging
import httpx
from pydub import AudioSegment
log = logging.getLogger(__name__)
async def synthesize(text: str, voice: str, config: dict) -> bytes:
"""Convert text to 16kHz mono PCM bytes using the configured TTS backend."""
backend = config["tts"]["backend"]
if backend == "elevenlabs":
return await _elevenlabs(text, voice, config)
raise ValueError(f"Unknown TTS backend: {backend}")
async def _elevenlabs(text: str, voice_name: str, config: dict) -> bytes:
el_cfg = config["tts"]["elevenlabs"]
api_key = el_cfg["api_key"]
voice_id = el_cfg["voices"].get(voice_name, el_cfg["voices"].get(el_cfg["default_voice"]))
url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}"
headers = {
"xi-api-key": api_key,
"Content-Type": "application/json",
}
payload = {"text": text}
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(url, headers=headers, json=payload)
resp.raise_for_status()
mp3_bytes = resp.content
audio = AudioSegment.from_mp3(io.BytesIO(mp3_bytes))
audio = audio.set_channels(1).set_frame_rate(16000).set_sample_width(2)
log.debug(f"TTS: {len(text)} chars -> {len(audio)}ms audio")
return audio.raw_data

72
pipeline/vad.py Normal file
View file

@ -0,0 +1,72 @@
import numpy as np
import webrtcvad
from collections import deque
class VAD:
def __init__(self, config: dict):
vad_cfg = config["vad"]
audio_cfg = config["audio"]
self.sample_rate = audio_cfg["sample_rate"]
self.chunk_size = audio_cfg["chunk_size"]
frames_per_sec = self.sample_rate // self.chunk_size
self.start_ratio = vad_cfg["start_ratio"]
self.silence_ratio = vad_cfg["silence_ratio"]
self.silence_time = vad_cfg["silence_time"]
window_frames = int(vad_cfg["window_s"] * frames_per_sec)
prebuf_frames = int(vad_cfg["pre_buffer_s"] * frames_per_sec)
self.vad = webrtcvad.Vad(vad_cfg["aggressiveness"])
self.window: deque[bool] = deque(maxlen=window_frames)
self.pre_buffer: deque[np.ndarray] = deque(maxlen=prebuf_frames)
self.buffer: list[np.ndarray] = []
self.recording = False
self.silence_count = 0
self.frames_per_sec = frames_per_sec
def reset(self):
self.buffer = []
self.recording = False
self.silence_count = 0
self.window.clear()
def process_frame(self, pcm_int16: np.ndarray) -> np.ndarray | None:
"""Feed a 30ms PCM frame. Returns complete utterance audio (int16) when speech ends, else None."""
raw = pcm_int16.tobytes()
is_speech = self.vad.is_speech(raw, self.sample_rate)
self.window.append(is_speech)
if len(self.window) < self.window.maxlen:
return None
ratio = sum(self.window) / len(self.window)
if not self.recording:
self.pre_buffer.append(pcm_int16)
if ratio > self.start_ratio:
self.recording = True
self.buffer.extend(self.pre_buffer)
self.pre_buffer.clear()
return None
self.buffer.append(pcm_int16)
if ratio < self.silence_ratio:
self.silence_count += 1
if self.silence_count > self.silence_time * self.frames_per_sec:
audio = np.concatenate(self.buffer)
self.reset()
return audio
else:
self.silence_count = 0
return None
@property
def is_speech_now(self) -> bool:
if not self.window:
return False
return bool(self.window[-1])

317
test_client.py Normal file
View file

@ -0,0 +1,317 @@
"""
Test client that emulates an ESP32 onju-voice device.
Runs a TCP server (receives audio/commands from pipeline), records mic audio,
encodes as u-law, and streams over UDP to the pipeline server.
Usage:
python test_client.py [server_ip] # default: 127.0.0.1
python test_client.py --no-mic # playback only, no mic
"""
import argparse
import audioop
import io
import socket
import struct
import sys
import threading
import time
import numpy as np
import opuslib
import pyaudio
# Audio settings matching ESP32
SAMPLE_RATE = 16000
CHUNK_SIZE = 480 # 30ms at 16kHz
UDP_PORT = 3000
TCP_PORT = 3001
MULTICAST_GROUP = "239.0.0.1"
MULTICAST_PORT = 12345
HOSTNAME = "test-client"
# u-law compression (matching ESP32 audio_compression.h)
def encode_ulaw(pcm_int16: np.ndarray) -> bytes:
return audioop.lin2ulaw(pcm_int16.tobytes(), 2)
# u-law decompression table (same as pipeline/audio.py)
ULAW_TABLE = np.array([
-32124,-31100,-30076,-29052,-28028,-27004,-25980,-24956,
-23932,-22908,-21884,-20860,-19836,-18812,-17788,-16764,
-15996,-15484,-14972,-14460,-13948,-13436,-12924,-12412,
-11900,-11388,-10876,-10364, -9852, -9340, -8828, -8316,
-7932, -7676, -7420, -7164, -6908, -6652, -6396, -6140,
-5884, -5628, -5372, -5116, -4860, -4604, -4348, -4092,
-3900, -3772, -3644, -3516, -3388, -3260, -3132, -3004,
-2876, -2748, -2620, -2492, -2364, -2236, -2108, -1980,
-1884, -1820, -1756, -1692, -1628, -1564, -1500, -1436,
-1372, -1308, -1244, -1180, -1116, -1052, -988, -924,
-876, -844, -812, -780, -748, -716, -684, -652,
-620, -588, -556, -524, -492, -460, -428, -396,
-372, -356, -340, -324, -308, -292, -276, -260,
-244, -228, -212, -196, -180, -164, -148, -132,
-120, -112, -104, -96, -88, -80, -72, -64,
-56, -48, -40, -32, -24, -16, -8, 0,
32124, 31100, 30076, 29052, 28028, 27004, 25980, 24956,
23932, 22908, 21884, 20860, 19836, 18812, 17788, 16764,
15996, 15484, 14972, 14460, 13948, 13436, 12924, 12412,
11900, 11388, 10876, 10364, 9852, 9340, 8828, 8316,
7932, 7676, 7420, 7164, 6908, 6652, 6396, 6140,
5884, 5628, 5372, 5116, 4860, 4604, 4348, 4092,
3900, 3772, 3644, 3516, 3388, 3260, 3132, 3004,
2876, 2748, 2620, 2492, 2364, 2236, 2108, 1980,
1884, 1820, 1756, 1692, 1628, 1564, 1500, 1436,
1372, 1308, 1244, 1180, 1116, 1052, 988, 924,
876, 844, 812, 780, 748, 716, 684, 652,
620, 588, 556, 524, 492, 460, 428, 396,
372, 356, 340, 324, 308, 292, 276, 260,
244, 228, 212, 196, 180, 164, 148, 132,
120, 112, 104, 96, 88, 80, 72, 64,
56, 48, 40, 32, 24, 16, 8, 0,
], dtype=np.int16)
class TestClient:
def __init__(self, server_ip: str, enable_mic: bool = True):
self.server_ip = server_ip
self.enable_mic = enable_mic
self.mic_active = False
self.mic_timeout = 0
self.running = True
self.pa = pyaudio.PyAudio()
self.opus_decoder = opuslib.Decoder(SAMPLE_RATE, 1)
# Output stream for playback
self.out_stream = self.pa.open(
format=pyaudio.paInt16,
channels=1,
rate=SAMPLE_RATE,
output=True,
frames_per_buffer=CHUNK_SIZE,
)
def tcp_server(self):
"""Listen for incoming TCP connections from the pipeline server (like ESP32 port 3001)."""
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(("0.0.0.0", TCP_PORT))
srv.listen(1)
srv.settimeout(1.0)
print(f"TCP server listening on :{TCP_PORT}")
while self.running:
try:
client, addr = srv.accept()
except socket.timeout:
continue
print(f"TCP connection from {addr[0]}")
# Read 6-byte header
header = b""
while len(header) < 6:
chunk = client.recv(6 - len(header))
if not chunk:
break
header += chunk
if len(header) < 6:
client.close()
continue
cmd = header[0]
if cmd == 0xAA:
timeout = (header[1] << 8) | header[2]
volume = header[3]
fade = header[4]
compression = header[5]
print(f" AUDIO: timeout={timeout}s vol={volume} compression={compression}")
self._handle_audio(client, compression, volume)
self.mic_timeout = time.time() + max(timeout, 60)
self.mic_active = True
print(f" Mic enabled for {max(timeout, 60)}s")
elif cmd == 0xBB:
bitmask = header[1]
r, g, b = header[2], header[3], header[4]
print(f" LED SET: mask={bitmask:#04x} rgb=({r},{g},{b})")
elif cmd == 0xCC:
intensity = header[1]
r, g, b = header[2], header[3], header[4]
fade = header[5]
# Extend mic timeout if nearly expired (like ESP32)
if self.mic_active and self.mic_timeout > time.time():
if self.mic_timeout < time.time() + 5:
self.mic_timeout = time.time() + 5
elif cmd == 0xDD:
timeout = (header[1] << 8) | header[2]
self.mic_timeout = time.time() + timeout
if timeout == 0:
self.mic_active = False
print(" MIC STOP (server processing)")
else:
print(f" MIC TIMEOUT: {timeout}s")
else:
print(f" Unknown command: {cmd:#04x}")
client.close()
def _handle_audio(self, client: socket.socket, compression: int, volume: int):
"""Receive and play audio from TCP connection."""
if compression == 2:
self._play_opus(client, volume)
elif compression == 0:
self._play_pcm(client, volume)
else:
print(f" Unsupported compression type: {compression}")
def _play_opus(self, client: socket.socket, volume: int):
"""Decode Opus frames and play through speakers."""
frame_count = 0
total_samples = 0
while True:
# Read 2-byte frame length
len_buf = b""
while len(len_buf) < 2:
chunk = client.recv(2 - len(len_buf))
if not chunk:
break
len_buf += chunk
if len(len_buf) < 2:
break
frame_len = struct.unpack('>H', len_buf)[0]
if frame_len == 0 or frame_len > 4000:
break
# Read opus frame
frame_data = b""
while len(frame_data) < frame_len:
chunk = client.recv(frame_len - len(frame_data))
if not chunk:
break
frame_data += chunk
if len(frame_data) < frame_len:
break
# Decode and play
pcm = self.opus_decoder.decode(frame_data, 320)
self.out_stream.write(pcm)
frame_count += 1
total_samples += 320
duration = total_samples / SAMPLE_RATE
print(f" Played {frame_count} Opus frames ({duration:.1f}s)")
def _play_pcm(self, client: socket.socket, volume: int):
"""Read raw PCM and play through speakers."""
total_bytes = 0
while True:
data = client.recv(4096)
if not data:
break
self.out_stream.write(data)
total_bytes += len(data)
duration = total_bytes / (SAMPLE_RATE * 2)
print(f" Played {total_bytes} PCM bytes ({duration:.1f}s)")
def mic_streamer(self):
"""Record from mic, encode as u-law, send over UDP to server (like ESP32 mic task)."""
if not self.enable_mic:
return
in_stream = self.pa.open(
format=pyaudio.paInt16,
channels=1,
rate=SAMPLE_RATE,
input=True,
frames_per_buffer=CHUNK_SIZE,
)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print(f"Mic streamer ready -> UDP {self.server_ip}:{UDP_PORT}")
packets_sent = 0
was_active = False
while self.running:
data = in_stream.read(CHUNK_SIZE, exception_on_overflow=False)
if not self.mic_active or time.time() > self.mic_timeout:
if was_active:
print(f"Mic stopped ({packets_sent} packets sent)")
was_active = False
packets_sent = 0
self.mic_active = False
continue
if not was_active:
print("Mic started streaming")
was_active = True
pcm = np.frombuffer(data, dtype=np.int16)
# DC offset removal
pcm = pcm - np.mean(pcm).astype(np.int16)
# u-law encode and send
ulaw = encode_ulaw(pcm)
sock.sendto(ulaw, (self.server_ip, UDP_PORT))
packets_sent += 1
in_stream.close()
sock.close()
def announce(self):
"""Send multicast announcement (like ESP32 on boot)."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
msg = f"{HOSTNAME} test-client".encode()
sock.sendto(msg, (MULTICAST_GROUP, MULTICAST_PORT))
sock.close()
print(f"Multicast announcement sent as '{HOSTNAME}'")
def run(self):
print(f"Test client starting (server: {self.server_ip})")
print(f" TCP server: :{TCP_PORT}")
print(f" UDP target: {self.server_ip}:{UDP_PORT}")
print(f" Mic: {'enabled' if self.enable_mic else 'disabled'}")
print()
self.announce()
threads = [
threading.Thread(target=self.tcp_server, daemon=True),
]
if self.enable_mic:
threads.append(threading.Thread(target=self.mic_streamer, daemon=True))
for t in threads:
t.start()
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print("\nShutting down...")
self.running = False
self.out_stream.close()
self.pa.terminate()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="ESP32 onju-voice test client")
parser.add_argument("server", nargs="?", default="127.0.0.1", help="Pipeline server IP")
parser.add_argument("--no-mic", action="store_true", help="Disable mic recording")
args = parser.parse_args()
client = TestClient(args.server, enable_mic=not args.no_mic)
client.run()