"""FlowerCore speech-align service. Wraps SYSTRAN/faster-whisper (https://github.com/SYSTRAN/faster-whisper) in a small FastAPI app exposing two endpoints: * POST /align — fc-align contract used by FlowerCore.Shared.Speech's FasterWhisperAlignmentClient on master. Multipart form (`audio`, `language`) returns `{text, words: [{word, startSeconds, endSeconds, confidence}], durationMs, language}`. * POST /transcribe — audio-file-in transcription used by the new TtsReader audio-import feature. Multipart form (`audio`, optional `language`) returns `{text, language, durationMs, segments: [{startSeconds, endSeconds, text}]}` so the UI can preview the transcript before piping it into Quick Read or saving as a project. Both endpoints share the same WhisperModel instance (loaded once at startup). Model is pinned by the WHISPER_MODEL env var (defaults to base.en) and cached under WHISPER_CACHE_DIR (defaults to /models, backed by a PVC in K8s). Health: GET /health → {status: ok, model, device, computeType}. """ from __future__ import annotations import io import logging import os import time from contextlib import asynccontextmanager from typing import Optional from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.responses import JSONResponse from faster_whisper import WhisperModel LOG = logging.getLogger("speech_align") logging.basicConfig( level=os.environ.get("LOG_LEVEL", "INFO"), format="%(asctime)s %(levelname)s %(name)s %(message)s", ) MODEL_NAME = os.environ.get("WHISPER_MODEL", "Systran/faster-whisper-base.en") DEVICE = os.environ.get("WHISPER_DEVICE", "cpu") COMPUTE_TYPE = os.environ.get("WHISPER_COMPUTE_TYPE", "int8") CACHE_DIR = os.environ.get("WHISPER_CACHE_DIR", "/models") MAX_BYTES = int(os.environ.get("MAX_AUDIO_BYTES", str(50 * 1024 * 1024))) # 50 MB DEFAULT_LANGUAGE = os.environ.get("DEFAULT_LANGUAGE", "en") _state: dict[str, object] = {} @asynccontextmanager async def lifespan(_app: FastAPI): LOG.info("Loading faster-whisper model %s (device=%s compute=%s cache=%s)", MODEL_NAME, DEVICE, COMPUTE_TYPE, CACHE_DIR) started = time.time() model = WhisperModel(MODEL_NAME, device=DEVICE, compute_type=COMPUTE_TYPE, download_root=CACHE_DIR) _state["model"] = model LOG.info("Model loaded in %.2fs", time.time() - started) yield _state.clear() app = FastAPI(title="FlowerCore speech-align", version="1.0.0", lifespan=lifespan) def _get_model() -> WhisperModel: model = _state.get("model") if model is None: raise HTTPException(status_code=503, detail="Model not loaded yet") return model # type: ignore[return-value] async def _read_upload(upload: UploadFile) -> bytes: payload = await upload.read() if not payload: raise HTTPException(status_code=400, detail="audio is empty") if len(payload) > MAX_BYTES: raise HTTPException( status_code=413, detail=f"audio exceeds {MAX_BYTES} byte limit ({len(payload)} bytes received)", ) return payload def _normalize_language(value: Optional[str]) -> Optional[str]: if not value or not value.strip(): return DEFAULT_LANGUAGE return value.strip().lower() def _transcribe_bytes(audio_bytes: bytes, language: Optional[str], word_timestamps: bool): model = _get_model() started = time.time() segments_iter, info = model.transcribe( io.BytesIO(audio_bytes), language=language, word_timestamps=word_timestamps, beam_size=1, vad_filter=True, ) segments = list(segments_iter) elapsed_ms = int((time.time() - started) * 1000) return segments, info, elapsed_ms @app.get("/health") def health(): return { "status": "ok" if _state.get("model") is not None else "loading", "model": MODEL_NAME, "device": DEVICE, "computeType": COMPUTE_TYPE, "defaultLanguage": DEFAULT_LANGUAGE, "maxBytes": MAX_BYTES, } @app.post("/align") async def align(audio: UploadFile = File(...), language: str = Form(DEFAULT_LANGUAGE)): """fc-align contract — used by FlowerCore.Shared.Speech.FasterWhisperAlignmentClient.""" payload = await _read_upload(audio) lang = _normalize_language(language) segments, info, elapsed_ms = _transcribe_bytes(payload, lang, word_timestamps=True) text_parts: list[str] = [] words: list[dict] = [] for segment in segments: text_parts.append(segment.text.strip()) for word in (segment.words or []): # Field names MUST match the FlowerCore.Shared.Speech contract: # `text` / `startMs` / `endMs`. The deployed FasterWhisperAlignmentClient # ignores any other names — see Common's # FasterWhisperAlignmentResponse / FasterWhisperWord. words.append({ "text": word.word.strip(), "startMs": int((word.start or 0.0) * 1000), "endMs": int((word.end or 0.0) * 1000), # Confidence is informational and ignored by the C# client today, # but kept on the wire for future scoring + fc-align operators # that want to surface low-confidence words. "confidence": float(getattr(word, "probability", 0.0) or 0.0), }) duration_ms = int((info.duration or 0.0) * 1000) return JSONResponse({ "text": " ".join(p for p in text_parts if p).strip(), "words": words, "durationMs": duration_ms, "language": info.language or lang, "elapsedMs": elapsed_ms, }) @app.post("/transcribe") async def transcribe(audio: UploadFile = File(...), language: Optional[str] = Form(None)): """Audio-in transcription contract — used by the new TtsReader audio-import feature. Returns full segments (no per-word timestamps) so the UI can preview the transcript before piping it into Quick Read or saving as a project. """ payload = await _read_upload(audio) lang = _normalize_language(language) segments, info, elapsed_ms = _transcribe_bytes(payload, lang, word_timestamps=False) out_segments = [ { "startSeconds": float(segment.start or 0.0), "endSeconds": float(segment.end or 0.0), "text": segment.text.strip(), } for segment in segments ] return JSONResponse({ "text": " ".join(s["text"] for s in out_segments if s["text"]).strip(), "segments": out_segments, "language": info.language or lang, "durationMs": int((info.duration or 0.0) * 1000), "elapsedMs": elapsed_ms, })