"""FlowerCore modern-tts — Microsoft Edge Read Aloud bridge for Modern Hebrew and Modern Greek (and other Edge-supported languages). Endpoints: * POST /tts — body: {"text", "voice", "rate"?, "volume"?, "pitch"?} returns audio/mpeg (Edge returns MP3) which the upstream FasterWhisperAlignmentClient + the WPF MediaPlayer both handle natively. * POST /timings — same body shape but returns {"text", "voice", "words": [{"text","startMs","endMs"}], "durationMs": ...} sourced from Edge's WordBoundary events — much more accurate than eSpeak's proportional-distribution approach because Edge emits real per-word offsets during synthesis. * GET /voices — voice catalog Edge knows about. Filtered to Hebrew + Greek by default; ?language=all returns everything Edge supports. * GET /health — fast readiness check. Pairs with fc-biblical-tts (eSpeak Ancient Greek + Hebrew). The biblical engine handles unpointed Hebrew + Erasmian Greek; this engine handles narrative Modern Hebrew + Modern Greek for translations the operator might be reading alongside the original. """ from __future__ import annotations import io import logging from typing import Optional import edge_tts from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse, Response from pydantic import BaseModel LOG = logging.getLogger("modern_tts") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") app = FastAPI(title="FlowerCore modern-tts", version="1.0.0") # Default voices by short code so AiStation can pick a sensible default # when the operator hasn't explicitly asked for one. Edge has multiple # voices per locale — these are the calmest male+female narrators. DEFAULT_VOICES = { "he": "he-IL-AvriNeural", "he-IL": "he-IL-AvriNeural", "el": "el-GR-NestorasNeural", "el-GR": "el-GR-NestorasNeural", "en": "en-US-AriaNeural", } class TtsRequest(BaseModel): text: str voice: Optional[str] = None language: Optional[str] = None rate: str = "+0%" # Edge accepts +20%, -10%, etc. volume: str = "+0%" pitch: str = "+0Hz" def _resolve_voice(req: TtsRequest) -> str: if req.voice: return req.voice.strip() if req.language and req.language in DEFAULT_VOICES: return DEFAULT_VOICES[req.language] return DEFAULT_VOICES["he"] @app.get("/health") def health(): return {"status": "ok"} @app.get("/voices") async def voices(language: str = "default"): catalog = await edge_tts.list_voices() if language == "all": return {"voices": catalog} # Default response: filter to languages relevant to the FlowerCore # biblical workflow (Hebrew + Greek) so the AiStation voice picker # isn't overwhelmed by 400+ Edge voices. keep = ("he-", "el-") filtered = [v for v in catalog if any(v.get("ShortName", "").startswith(k) for k in keep)] return {"voices": filtered} async def _synth_with_subtitles(req: TtsRequest): voice = _resolve_voice(req) LOG.info("edge-tts synth voice=%s len=%d", voice, len(req.text)) communicate = edge_tts.Communicate( req.text, voice=voice, rate=req.rate, volume=req.volume, pitch=req.pitch, ) audio_buf = io.BytesIO() word_events: list[dict] = [] async for chunk in communicate.stream(): if chunk["type"] == "audio": audio_buf.write(chunk["data"]) elif chunk["type"] == "WordBoundary": word_events.append({ "text": chunk.get("text") or "", "offset": chunk.get("offset", 0), # 100-ns ticks "duration": chunk.get("duration", 0), # 100-ns ticks }) return voice, audio_buf.getvalue(), word_events def _to_ms(ticks_100ns: int) -> int: # Edge emits offsets in 100-nanosecond ticks (.NET TimeSpan style). return int(round(ticks_100ns / 10_000)) @app.post("/tts") async def tts(req: TtsRequest): if not req.text.strip(): raise HTTPException(status_code=400, detail="text is required") try: voice, audio_bytes, _ = await _synth_with_subtitles(req) except edge_tts.exceptions.NoAudioReceived: raise HTTPException(status_code=502, detail="edge-tts returned no audio for the supplied voice/text.") except Exception as ex: raise HTTPException(status_code=502, detail=f"edge-tts failure: {ex}") if not audio_bytes: raise HTTPException(status_code=502, detail="edge-tts returned an empty audio stream.") return Response(content=audio_bytes, media_type="audio/mpeg", headers={"X-FlowerCore-Voice": voice}) def _estimate_duration_ms_from_mp3(audio_bytes: bytes) -> int: """Best-effort duration estimate from raw MP3 bytes by walking frame headers. Edge always returns CBR ~24kbps mono so we can infer total ms from frame count. If parsing fails, return 0 and let the caller fall through to a per-character heuristic.""" if not audio_bytes: return 0 # MP3 sample rates by version+layer (MPEG1 layer3 / MPEG2 layer3 / MPEG2.5 layer3). # We just walk frame headers and count frames; each frame is 1152 samples. sample_rates_v1 = [44100, 48000, 32000, 0] sample_rates_v2 = [22050, 24000, 16000, 0] sample_rates_v25 = [11025, 12000, 8000, 0] bitrates_v1_l3 = [0,32000,40000,48000,56000,64000,80000,96000,112000,128000,160000,192000,224000,256000,320000,0] bitrates_v2_l3 = [0,8000,16000,24000,32000,40000,48000,56000,64000,80000,96000,112000,128000,144000,160000,0] pos = 0 total_samples = 0 sample_rate = 0 while pos + 4 <= len(audio_bytes): b0, b1, b2, b3 = audio_bytes[pos], audio_bytes[pos+1], audio_bytes[pos+2], audio_bytes[pos+3] if b0 != 0xFF or (b1 & 0xE0) != 0xE0: pos += 1 continue version_bits = (b1 >> 3) & 0x03 layer_bits = (b1 >> 1) & 0x03 if layer_bits != 0x01: # layer 3 only pos += 1 continue bitrate_index = (b2 >> 4) & 0x0F sample_rate_index = (b2 >> 2) & 0x03 padding = (b2 >> 1) & 0x01 if version_bits == 0x03: # MPEG1 sample_rate = sample_rates_v1[sample_rate_index] bitrate = bitrates_v1_l3[bitrate_index] samples_per_frame = 1152 elif version_bits == 0x02: # MPEG2 sample_rate = sample_rates_v2[sample_rate_index] bitrate = bitrates_v2_l3[bitrate_index] samples_per_frame = 576 elif version_bits == 0x00: # MPEG2.5 sample_rate = sample_rates_v25[sample_rate_index] bitrate = bitrates_v2_l3[bitrate_index] samples_per_frame = 576 else: pos += 1 continue if not (sample_rate and bitrate): pos += 1 continue frame_length = int((samples_per_frame * bitrate / 8) / sample_rate) + padding if frame_length <= 0: pos += 1 continue total_samples += samples_per_frame pos += frame_length if sample_rate <= 0: return 0 return int(round(total_samples * 1000 / sample_rate)) @app.post("/timings") async def timings(req: TtsRequest): if not req.text.strip(): raise HTTPException(status_code=400, detail="text is required") try: voice, audio_bytes, events = await _synth_with_subtitles(req) except Exception as ex: raise HTTPException(status_code=502, detail=f"edge-tts failure: {ex}") words: list[dict] = [] for event in events: start = _to_ms(event["offset"]) end = start + _to_ms(event["duration"]) words.append({"text": event.get("text", ""), "startMs": start, "endMs": end}) # Edge sometimes omits WordBoundary events for non-English voices # (notably he-IL-* and el-GR-*). Fall back to proportional distribution # over the input text — same approach the eSpeak biblical-tts uses. if not words and req.text.strip(): total_ms = _estimate_duration_ms_from_mp3(audio_bytes) if total_ms <= 0: # Last-resort fallback: ~600ms per word at average speaking rate. total_ms = max(1, len(req.text.split())) * 600 tokens = req.text.split() if tokens: char_total = sum(max(1, len(w)) for w in tokens) cursor = 0 for token in tokens: share = int(round(total_ms * max(1, len(token)) / char_total)) start = cursor end = start + share words.append({"text": token, "startMs": start, "endMs": end}) cursor = end words[-1]["endMs"] = total_ms duration_ms = words[-1]["endMs"] if words else 0 return JSONResponse({ "text": req.text, "voice": voice, "words": words, "durationMs": duration_ms, "audioBytes": len(audio_bytes), })