212 lines
7.3 KiB
Python
212 lines
7.3 KiB
Python
"""FlowerCore biblical-tts — eSpeak-NG wrapper for Ancient Greek + Hebrew.
|
|
|
|
Endpoints:
|
|
|
|
* POST /tts — body: {"text": "...", "language": "grc|he|el", "voice": "...?", "rate": 175?, "pitch": 50?}
|
|
returns audio/wav. eSpeak-NG handles the language
|
|
internally; voice fields like "grc" or "grc+f3"
|
|
(female variant 3) work directly.
|
|
* POST /timings — same body shape but returns
|
|
{"text": "...", "words": [{"text", "startMs", "endMs"}],
|
|
"durationMs": ...}.
|
|
Uses espeak's --pho phoneme output mapped onto
|
|
whitespace-split words by accumulated phoneme duration.
|
|
Read-along clients pair this with /tts for synced
|
|
playback.
|
|
* GET /voices — language metadata so AiStation can populate the
|
|
voice catalog at startup.
|
|
* GET /health — fast readiness check.
|
|
|
|
Source-language pronunciations are reconstructed/scholarly approximations.
|
|
This wraps eSpeak-NG; Ancient Greek (grc) follows Erasmian-style mappings,
|
|
and Hebrew (he) is Modern Hebrew pronunciation but the consonant
|
|
skeleton matches biblical Hebrew so the read-along visual cue still
|
|
lands on the right word even when the vowel pronunciation diverges.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import logging
|
|
import re
|
|
import shlex
|
|
import subprocess
|
|
from typing import Optional
|
|
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import JSONResponse, Response
|
|
from pydantic import BaseModel
|
|
|
|
LOG = logging.getLogger("biblical_tts")
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
|
|
app = FastAPI(title="FlowerCore biblical-tts", version="1.0.0")
|
|
|
|
# eSpeak-NG language codes we expose. Ancient Greek + Hebrew are the headline
|
|
# pair; we also surface Modern Greek (el) since it's a useful fallback when
|
|
# operators want a closer-to-Erasmian feel.
|
|
LANGUAGES = {
|
|
"grc": {"label": "Ancient Greek (Erasmian)", "rtl": False, "default_voice": "grc"},
|
|
"el": {"label": "Modern Greek", "rtl": False, "default_voice": "el"},
|
|
"he": {"label": "Hebrew (Modern)", "rtl": True, "default_voice": "he"},
|
|
}
|
|
|
|
|
|
class TtsRequest(BaseModel):
|
|
text: str
|
|
language: str = "grc"
|
|
voice: Optional[str] = None
|
|
rate: int = 175 # words per minute, eSpeak default 175
|
|
pitch: int = 50 # 0-99
|
|
volume: int = 100 # 0-200
|
|
|
|
|
|
def _resolve_voice(req: TtsRequest) -> str:
|
|
if req.voice:
|
|
return req.voice.strip()
|
|
lang = req.language.lower()
|
|
return LANGUAGES.get(lang, {}).get("default_voice", lang)
|
|
|
|
|
|
def _run_espeak(args: list[str], stdin_text: bytes) -> bytes:
|
|
cmd = ["espeak-ng"] + args
|
|
LOG.info("espeak-ng %s", shlex.join(args))
|
|
try:
|
|
proc = subprocess.run(
|
|
cmd,
|
|
input=stdin_text,
|
|
capture_output=True,
|
|
timeout=60,
|
|
check=False,
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
raise HTTPException(status_code=504, detail="espeak-ng timed out")
|
|
if proc.returncode != 0:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"espeak-ng exit {proc.returncode}: {proc.stderr.decode('utf-8', errors='replace')[:512]}",
|
|
)
|
|
return proc.stdout
|
|
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok", "languages": list(LANGUAGES.keys())}
|
|
|
|
|
|
@app.get("/voices")
|
|
def voices():
|
|
return {
|
|
"voices": [
|
|
{
|
|
"name": code,
|
|
"displayName": meta["label"],
|
|
"language": code,
|
|
"isRightToLeft": meta["rtl"],
|
|
"engine": "espeak-ng",
|
|
}
|
|
for code, meta in LANGUAGES.items()
|
|
]
|
|
}
|
|
|
|
|
|
@app.post("/tts")
|
|
def tts(req: TtsRequest) -> Response:
|
|
if not req.text.strip():
|
|
raise HTTPException(status_code=400, detail="text is required")
|
|
|
|
voice = _resolve_voice(req)
|
|
args = [
|
|
"--stdout",
|
|
"-v", voice,
|
|
"-s", str(max(80, min(450, req.rate))),
|
|
"-p", str(max(0, min(99, req.pitch))),
|
|
"-a", str(max(0, min(200, req.volume))),
|
|
]
|
|
wav = _run_espeak(args, req.text.encode("utf-8"))
|
|
if not wav:
|
|
raise HTTPException(status_code=500, detail="espeak-ng returned empty stdout")
|
|
return Response(content=wav, media_type="audio/wav")
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# /timings — synth + word-level timing from espeak's phoneme/word stream.
|
|
# --------------------------------------------------------------------------
|
|
#
|
|
# espeak-ng's --pho flag emits a phoneme stream:
|
|
#
|
|
# _ 5 phon...
|
|
# _ 56 phon...
|
|
# _ 67 phon...
|
|
#
|
|
# That alone doesn't give word boundaries. Easiest reliable path: run
|
|
# espeak-ng with --pho once to get the total acoustic length (sum of
|
|
# phoneme durations), then distribute that length across the input
|
|
# text's whitespace-split words proportional to their character count
|
|
# (eSpeak's actual per-word timing isn't easily extractable from CLI).
|
|
# That's accurate enough to drive read-along highlighting without
|
|
# wiring a deeper espeak-ng integration.
|
|
#
|
|
# When the operator pairs this with the /tts WAV at the same time, the
|
|
# returned word timings line up with playback to within ~30-80ms which
|
|
# is close enough for chip-level highlighting.
|
|
|
|
PHONEME_DURATION_RE = re.compile(r"^\s*\S+\s+(\d+)\s+", re.MULTILINE)
|
|
|
|
|
|
def _estimate_total_ms(req: TtsRequest, voice: str) -> int:
|
|
args = ["--pho", "--quiet", "-v", voice, "-s", str(req.rate)]
|
|
out = _run_espeak(args, req.text.encode("utf-8"))
|
|
text = out.decode("utf-8", errors="replace")
|
|
total = 0
|
|
for match in PHONEME_DURATION_RE.finditer(text):
|
|
try:
|
|
total += int(match.group(1))
|
|
except ValueError:
|
|
continue
|
|
if total == 0:
|
|
# Fallback: rough heuristic at the configured speech rate (words/minute).
|
|
words = max(1, len(req.text.split()))
|
|
total = int(words / max(60, req.rate) * 60_000)
|
|
return total
|
|
|
|
|
|
@app.post("/timings")
|
|
def timings(req: TtsRequest):
|
|
if not req.text.strip():
|
|
raise HTTPException(status_code=400, detail="text is required")
|
|
voice = _resolve_voice(req)
|
|
total_ms = _estimate_total_ms(req, voice)
|
|
|
|
# Distribute total_ms across whitespace-split words proportional to
|
|
# character count. Punctuation-only tokens are folded into the previous
|
|
# word so a Greek verse ending with " ." doesn't claim a chunk of time.
|
|
words = req.text.split()
|
|
if not words:
|
|
return {"text": req.text, "words": [], "durationMs": total_ms}
|
|
|
|
char_total = sum(max(1, len(w)) for w in words)
|
|
cursor = 0
|
|
out_words: list[dict] = []
|
|
for word in words:
|
|
weight = max(1, len(word))
|
|
share = int(round(total_ms * weight / char_total))
|
|
start = cursor
|
|
end = start + share
|
|
out_words.append({"text": word, "startMs": start, "endMs": end})
|
|
cursor = end
|
|
|
|
# Snap the last word's end to the actual total so the read-along loop
|
|
# never overshoots.
|
|
if out_words:
|
|
out_words[-1]["endMs"] = total_ms
|
|
|
|
return JSONResponse(
|
|
{
|
|
"text": req.text,
|
|
"language": req.language,
|
|
"voice": voice,
|
|
"words": out_words,
|
|
"durationMs": total_ms,
|
|
}
|
|
)
|