Files
bluejay-infra/apps/fc-ttsreader/biblical-tts/app.py

398 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""FlowerCore biblical-tts — eSpeak-NG wrapper for Ancient Greek + Hebrew.
Endpoints:
* POST /tts — body: {"text": "...", "language": "grc|he|el", "voice": "...?", "rate": 175?, "pitch": 50?}
returns audio/wav. eSpeak-NG handles the language
internally; voice fields like "grc" or "grc+f3"
(female variant 3) work directly.
* POST /timings — same body shape but returns
{"text": "...", "words": [{"text", "startMs", "endMs"}],
"durationMs": ...}.
Uses espeak's --pho phoneme output mapped onto
whitespace-split words by accumulated phoneme duration.
Read-along clients pair this with /tts for synced
playback.
* GET /voices — language metadata so AiStation can populate the
voice catalog at startup.
* GET /health — fast readiness check.
Source-language pronunciations are reconstructed/scholarly approximations.
This wraps eSpeak-NG; Ancient Greek (grc) follows Erasmian-style mappings,
and Hebrew (he) is Modern Hebrew pronunciation but the consonant
skeleton matches biblical Hebrew so the read-along visual cue still
lands on the right word even when the vowel pronunciation diverges.
"""
from __future__ import annotations
import io
import logging
import re
import shlex
import subprocess
import unicodedata
from typing import Optional
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse, Response
from pydantic import BaseModel
LOG = logging.getLogger("biblical_tts")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
app = FastAPI(title="FlowerCore biblical-tts", version="1.0.0")
# eSpeak-NG language codes we expose. Ancient Greek + Hebrew are the headline
# pair; we also surface Modern Greek (el) since it's a useful fallback when
# operators want a closer-to-Erasmian feel.
LANGUAGES = {
"grc": {"label": "Ancient Greek (Erasmian)", "rtl": False, "default_voice": "grc"},
"el": {"label": "Modern Greek", "rtl": False, "default_voice": "el"},
"he": {"label": "Hebrew (Modern)", "rtl": True, "default_voice": "he"},
}
class TtsRequest(BaseModel):
text: str
language: str = "grc"
voice: Optional[str] = None
rate: int = 175 # words per minute, eSpeak default 175
pitch: int = 50 # 0-99
volume: int = 100 # 0-200
HEBREW_CHAR_RE = re.compile(r"[\u0590-\u05FF]")
HEBREW_WORD_RE = re.compile(r"[\u0590-\u05FF]+")
# eSpeak-NG's Hebrew voice can spell unpointed Hebrew as Unicode character
# names on some builds. For source-text study reads, prefer a stable
# scholarly transliteration so words sound like words even without niqqud.
HEBREW_WORD_TRANSLITERATIONS = {
"אב": "av",
"אבא": "abba",
"אברהם": "Avraham",
"אדמה": "adamah",
"אדני": "Adonai",
"אדם": "adam",
"אור": "or",
"אלהים": "Elohim",
"אלוהים": "Elohim",
"אמן": "amen",
"אם": "em",
"אמת": "emet",
"ארץ": "eretz",
"אש": "esh",
"את": "et",
"בית": "beit",
"בן": "ben",
"ברא": "bara",
"בראשית": "bereshit",
"ברית": "berit",
"ברוך": "barukh",
"בת": "bat",
"גוי": "goy",
"גוים": "goyim",
"גויים": "goyim",
"דבר": "davar",
"דברים": "devarim",
"דוד": "David",
"הלל": "hallel",
"הארץ": "ha-aretz",
"הברית": "ha-berit",
"החדשה": "ha-chadashah",
"השמים": "ha-shamayim",
"השמיים": "ha-shamayim",
"ויאמר": "vayomer",
"יהוה": "Adonai",
"יוסף": "Yosef",
"יוחנן": "Yochanan",
"ישראל": "Yisrael",
"ישוע": "Yeshua",
"יצחק": "Yitzchak",
"יעקב": "Yaakov",
"ירושלים": "Yerushalayim",
"כהן": "kohen",
"כהנים": "kohanim",
"מים": "mayim",
"מות": "mavet",
"מושיע": "moshia",
"מלך": "melekh",
"מלכות": "malkhut",
"מרים": "Miriam",
"משה": "Moshe",
"משיח": "Mashiach",
"נביא": "navi",
"נביאים": "neviim",
"עם": "am",
"עולם": "olam",
"צדק": "tzedek",
"קדוש": "qadosh",
"קדושים": "qedoshim",
"קול": "qol",
"רוח": "ruach",
"שאול": "Shaul",
"שמים": "shamayim",
"שמיים": "shamayim",
"שמעון": "Shimon",
"שלום": "Shalom",
"תורה": "torah",
"חכמה": "chokhmah",
"חסד": "chesed",
"חיים": "chayim",
"חושך": "choshekh",
}
HEBREW_LETTERS = {
"א": "a",
"ב": "b",
"ג": "g",
"ד": "d",
"ה": "h",
"ו": "v",
"ז": "z",
"ח": "kh",
"ט": "t",
"י": "y",
"כ": "kh",
"ך": "kh",
"ל": "l",
"מ": "m",
"ם": "m",
"נ": "n",
"ן": "n",
"ס": "s",
"ע": "a",
"פ": "p",
"ף": "f",
"צ": "ts",
"ץ": "ts",
"ק": "q",
"ר": "r",
"ש": "sh",
"ת": "t",
}
HEBREW_VOWELISH = {"a", "e", "i", "o", "u"}
def _strip_hebrew_marks(value: str) -> str:
decomposed = unicodedata.normalize("NFD", value)
return "".join(
ch for ch in decomposed
if unicodedata.category(ch) != "Mn" and ch not in {"׳", "״", "־"}
)
def _fallback_hebrew_transliteration(word: str) -> str:
tokens: list[str] = []
chars = list(word)
for index, ch in enumerate(chars):
token = HEBREW_LETTERS.get(ch)
if token is None:
continue
if ch == "ה" and index == len(chars) - 1:
token = "ah"
elif ch == "י" and index > 0:
token = "i"
elif ch == "ו" and index > 0:
token = "o"
tokens.append(token)
if not tokens:
return word
spoken: list[str] = []
for index, token in enumerate(tokens):
spoken.append(token)
next_token = tokens[index + 1] if index + 1 < len(tokens) else ""
if (
token[-1:] not in HEBREW_VOWELISH
and next_token
and next_token[:1] not in HEBREW_VOWELISH
):
spoken.append("a")
return "".join(spoken)
def _transliterate_hebrew_word(match: re.Match[str]) -> str:
original = match.group(0)
normalized = _strip_hebrew_marks(original)
if not normalized:
return original
direct = HEBREW_WORD_TRANSLITERATIONS.get(normalized)
if direct:
return direct
if normalized.startswith("ו") and len(normalized) > 1:
rest = HEBREW_WORD_TRANSLITERATIONS.get(normalized[1:])
if rest:
return f"ve-{rest}"
if normalized.startswith("ה") and len(normalized) > 1:
rest = HEBREW_WORD_TRANSLITERATIONS.get(normalized[1:])
if rest:
return f"ha-{rest}"
return _fallback_hebrew_transliteration(normalized)
def _prepare_synthesis_input(text: str, language: str, voice: str) -> tuple[str, str]:
if language.lower().startswith("he") and HEBREW_CHAR_RE.search(text):
spoken = HEBREW_WORD_RE.sub(_transliterate_hebrew_word, text)
return spoken, "en-us"
return text, voice
def _resolve_voice(req: TtsRequest) -> str:
if req.voice:
return req.voice.strip()
lang = req.language.lower()
return LANGUAGES.get(lang, {}).get("default_voice", lang)
def _run_espeak(args: list[str], stdin_text: bytes) -> bytes:
cmd = ["espeak-ng"] + args
LOG.info("espeak-ng %s", shlex.join(args))
try:
proc = subprocess.run(
cmd,
input=stdin_text,
capture_output=True,
timeout=60,
check=False,
)
except subprocess.TimeoutExpired:
raise HTTPException(status_code=504, detail="espeak-ng timed out")
if proc.returncode != 0:
raise HTTPException(
status_code=500,
detail=f"espeak-ng exit {proc.returncode}: {proc.stderr.decode('utf-8', errors='replace')[:512]}",
)
return proc.stdout
@app.get("/health")
def health():
return {"status": "ok", "languages": list(LANGUAGES.keys())}
@app.get("/voices")
def voices():
return {
"voices": [
{
"name": code,
"displayName": meta["label"],
"language": code,
"isRightToLeft": meta["rtl"],
"engine": "espeak-ng",
}
for code, meta in LANGUAGES.items()
]
}
@app.post("/tts")
def tts(req: TtsRequest) -> Response:
if not req.text.strip():
raise HTTPException(status_code=400, detail="text is required")
voice = _resolve_voice(req)
spoken_text, synth_voice = _prepare_synthesis_input(req.text, req.language, voice)
args = [
"--stdout",
"-v", synth_voice,
"-s", str(max(80, min(450, req.rate))),
"-p", str(max(0, min(99, req.pitch))),
"-a", str(max(0, min(200, req.volume))),
]
wav = _run_espeak(args, spoken_text.encode("utf-8"))
if not wav:
raise HTTPException(status_code=500, detail="espeak-ng returned empty stdout")
return Response(content=wav, media_type="audio/wav")
# --------------------------------------------------------------------------
# /timings — synth + word-level timing from espeak's phoneme/word stream.
# --------------------------------------------------------------------------
#
# espeak-ng's --pho flag emits a phoneme stream:
#
# _ 5 phon...
# _ 56 phon...
# _ 67 phon...
#
# That alone doesn't give word boundaries. Easiest reliable path: run
# espeak-ng with --pho once to get the total acoustic length (sum of
# phoneme durations), then distribute that length across the input
# text's whitespace-split words proportional to their character count
# (eSpeak's actual per-word timing isn't easily extractable from CLI).
# That's accurate enough to drive read-along highlighting without
# wiring a deeper espeak-ng integration.
#
# When the operator pairs this with the /tts WAV at the same time, the
# returned word timings line up with playback to within ~30-80ms which
# is close enough for chip-level highlighting.
PHONEME_DURATION_RE = re.compile(r"^\s*\S+\s+(\d+)\s+", re.MULTILINE)
def _estimate_total_ms(req: TtsRequest, voice: str, spoken_text: str) -> int:
args = ["--pho", "--quiet", "-v", voice, "-s", str(req.rate)]
out = _run_espeak(args, spoken_text.encode("utf-8"))
text = out.decode("utf-8", errors="replace")
total = 0
for match in PHONEME_DURATION_RE.finditer(text):
try:
total += int(match.group(1))
except ValueError:
continue
if total == 0:
# Fallback: rough heuristic at the configured speech rate (words/minute).
words = max(1, len(req.text.split()))
total = int(words / max(60, req.rate) * 60_000)
return total
@app.post("/timings")
def timings(req: TtsRequest):
if not req.text.strip():
raise HTTPException(status_code=400, detail="text is required")
voice = _resolve_voice(req)
spoken_text, synth_voice = _prepare_synthesis_input(req.text, req.language, voice)
total_ms = _estimate_total_ms(req, synth_voice, spoken_text)
# Distribute total_ms across whitespace-split words proportional to
# character count. Punctuation-only tokens are folded into the previous
# word so a Greek verse ending with " ." doesn't claim a chunk of time.
words = req.text.split()
if not words:
return {"text": req.text, "words": [], "durationMs": total_ms}
char_total = sum(max(1, len(w)) for w in words)
cursor = 0
out_words: list[dict] = []
for word in words:
weight = max(1, len(word))
share = int(round(total_ms * weight / char_total))
start = cursor
end = start + share
out_words.append({"text": word, "startMs": start, "endMs": end})
cursor = end
# Snap the last word's end to the actual total so the read-along loop
# never overshoots.
if out_words:
out_words[-1]["endMs"] = total_ms
return JSONResponse(
{
"text": req.text,
"language": req.language,
"voice": synth_voice,
"words": out_words,
"durationMs": total_ms,
}
)