Source code for audia.agents.tts

"""
Text-to-Speech wrapper supporting multiple backends:

  - edge-tts  (default, free, requires internet)
  - kokoro    (local, requires: pip install audia[kokoro])
  - openai    (requires API key)

All backends return the absolute path to the generated audio file.
"""

from __future__ import annotations

import asyncio
import re
import time
import uuid
from pathlib import Path

from rich.console import Console

from audia.config import Settings, get_settings

console = Console(stderr=True)

# Per-chunk timeout in seconds (edge-tts network call)
_EDGE_TTS_CHUNK_TIMEOUT = 90


# ──────────────────────────────────────────────────────────── public API


[docs] def synthesize( text: str, output_dir: str | Path | None = None, filename: str | None = None, settings: Settings | None = None, progress_cb=None, ) -> Path: """ Convert *text* to an audio file and return its path. Parameters ---------- text: The cleaned text to synthesise. output_dir: Directory for the output file. Defaults to settings.audio_dir. filename: Desired filename (without extension). Auto-generated when None. settings: Audia settings; uses global settings when None. """ cfg = settings or get_settings() out_dir = Path(output_dir) if output_dir else cfg.audio_dir out_dir.mkdir(parents=True, exist_ok=True) stem = filename or f"audia_{int(time.time())}_{uuid.uuid4().hex[:6]}" backend = cfg.tts_backend if backend == "edge-tts": return _edge_tts(text, out_dir, stem, cfg, progress_cb) elif backend == "kokoro": return _kokoro_tts(text, out_dir, stem, cfg) elif backend == "openai": return _openai_tts(text, out_dir, stem, cfg) else: raise ValueError(f"Unknown TTS backend: {backend}")
# ──────────────────────────────────────────────────────────── edge-tts def _edge_tts(text: str, out_dir: Path, stem: str, cfg: Settings, progress_cb=None) -> Path: """Use Microsoft Edge TTS (free, no API key). Generates mp3 via network.""" try: import edge_tts # type: ignore except ImportError as e: raise ImportError("edge-tts is required: pip install edge-tts") from e chunks = _split(text, cfg.tts_chunk_chars) total = len(chunks) hdr = f"TTS: {total} chunk(s) to synthesise" console.print(f" [dim]{hdr}[/dim]") if progress_cb: progress_cb(hdr) chunk_paths: list[Path] = [] for i, chunk in enumerate(chunks, 1): chunk_path = out_dir / f"{stem}_part{i:03d}.mp3" msg_start = f"Synthesising chunk {i}/{total} ({len(chunk):,} chars)\u2026" console.print(f" [dim] {msg_start}[/dim]") if progress_cb: progress_cb(msg_start) _run_async(_edge_speak(chunk, str(chunk_path), cfg.tts_voice, cfg.tts_rate, edge_tts)) chunk_paths.append(chunk_path) msg_done = f"Chunk {i}/{total} done \u2192 {chunk_path.name}" console.print(f" [dim] {msg_done}[/dim]") if progress_cb: progress_cb(msg_done) if len(chunk_paths) == 1: final_path = out_dir / f"{stem}.mp3" chunk_paths[0].rename(final_path) return final_path final_path = _concat_mp3(chunk_paths, out_dir / f"{stem}.mp3") for p in chunk_paths: p.unlink(missing_ok=True) return final_path def _run_async(coro) -> None: """ Run *coro* in an event loop. Works in both sync contexts (CLI) and when called from a thread inside an async server (FastAPI uses run_in_threadpool). """ try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop and loop.is_running(): # We're inside a running event loop (FastAPI worker thread). # asyncio.run() would fail here; use a new event loop in this thread. import concurrent.futures with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: future = pool.submit(asyncio.run, coro) future.result(timeout=_EDGE_TTS_CHUNK_TIMEOUT + 5) else: asyncio.run(coro) async def _edge_speak( text: str, output_path: str, voice: str, rate: str, edge_tts_module, ) -> None: communicate = edge_tts_module.Communicate(text, voice=voice, rate=rate) await asyncio.wait_for(communicate.save(output_path), timeout=_EDGE_TTS_CHUNK_TIMEOUT) def _concat_mp3(parts: list[Path], dest: Path) -> Path: """Concatenate MP3 files by raw byte joining (works for CBR streams).""" with dest.open("wb") as out: for part in parts: out.write(part.read_bytes()) return dest # ──────────────────────────────────────────────────────────── kokoro def _kokoro_tts(text: str, out_dir: Path, stem: str, cfg: Settings) -> Path: """Use Kokoro local TTS model (pip install audia[kokoro]).""" try: import numpy as np import soundfile as sf # type: ignore from kokoro import KPipeline # type: ignore except ImportError as e: raise ImportError( "Kokoro TTS requires extra dependencies: pip install audia[kokoro]" ) from e pipeline = KPipeline(lang_code="a") # 'a' = American English chunks = _split(text, cfg.tts_chunk_chars) all_audio: list = [] for chunk in chunks: for _, _, audio in pipeline(chunk, voice=cfg.tts_voice, speed=1.0): all_audio.append(audio) combined = np.concatenate(all_audio) out_path = out_dir / f"{stem}.wav" sf.write(str(out_path), combined, samplerate=24000) return out_path # ──────────────────────────────────────────────────────────── openai def _openai_tts(text: str, out_dir: Path, stem: str, cfg: Settings) -> Path: """Use OpenAI TTS API.""" try: from openai import OpenAI # type: ignore except ImportError as e: raise ImportError("OpenAI TTS requires: pip install audia[openai]") from e client_kwargs: dict = dict(api_key=cfg.openai_api_key) if cfg.openai_api_base: client_kwargs["base_url"] = cfg.openai_api_base client = OpenAI(**client_kwargs) chunks = _split(text, 4096) # OpenAI limit chunk_paths: list[Path] = [] for i, chunk in enumerate(chunks): response = client.audio.speech.create( model="tts-1", voice=cfg.tts_voice, # alloy, echo, nova, shimmer … input=chunk, ) p = out_dir / f"{stem}_part{i:03d}.mp3" response.stream_to_file(str(p)) chunk_paths.append(p) if len(chunk_paths) == 1: final_path = out_dir / f"{stem}.mp3" chunk_paths[0].rename(final_path) return final_path final_path = _concat_mp3(chunk_paths, out_dir / f"{stem}.mp3") for p in chunk_paths: p.unlink(missing_ok=True) return final_path # ──────────────────────────────────────────────────────────── helpers def _split(text: str, max_chars: int) -> list[str]: """ Split text into chunks ≤ max_chars. Prefers sentence boundaries; falls back to whitespace. """ if len(text) <= max_chars: return [text] chunks: list[str] = [] # Split at sentence ends first sentences = re.split(r"(?<=[.!?])\s+", text) current = "" for sent in sentences: if len(current) + len(sent) + 1 <= max_chars: current += (" " if current else "") + sent else: if current: chunks.append(current) # If a single sentence exceeds max_chars, split by words if len(sent) > max_chars: words = sent.split() sub = "" for w in words: if len(sub) + len(w) + 1 <= max_chars: sub += (" " if sub else "") + w else: if sub: chunks.append(sub) sub = w if sub: current = sub else: current = "" else: current = sent if current: chunks.append(current) return chunks or [text]