Files
bluejay-infra/apps/fc-ttsreader/speech-align/app.py
Andrew Stoltz df115e4d1e fc-ttsreader: ship cluster-native fc-speech-align (faster-whisper) + bump web
- New ttsreader-align Deployment + Service + 5Gi PVC under
  apps/fc-ttsreader/. Wraps SYSTRAN/faster-whisper in a small FastAPI app
  exposing POST /align (fc-align contract used by Shared.Speech) AND
  POST /transcribe (audio-in feature consumed by ttsreader-web Lane G).
  Source: apps/fc-ttsreader/speech-align/ (Dockerfile + app.py +
  requirements.txt). Built locally (apt-get RUN steps need BLUEJAY-WS,
  not noc1) and ctr-imported to all 3 RKE2 nodes.
- ttsreader-web env: flip Speech__Alignment__Enabled=true and point
  BaseUrl at http://ttsreader-align.fc-ttsreader.svc.cluster.local.:9200.
  Add new TtsReader__Transcription__* env triplet pointing at the same
  service (same /transcribe endpoint).
- Bump ttsreader-web image to v202604251046 (carries the
  TranscriptionController + MCP tool + Quick.razor InputFile UI).
2026-04-25 10:50:45 -05:00

175 lines
6.2 KiB
Python

"""FlowerCore speech-align service.
Wraps SYSTRAN/faster-whisper (https://github.com/SYSTRAN/faster-whisper) in a
small FastAPI app exposing two endpoints:
* POST /align — fc-align contract used by FlowerCore.Shared.Speech's
FasterWhisperAlignmentClient on master. Multipart form
(`audio`, `language`) returns
`{text, words: [{word, startSeconds, endSeconds, confidence}],
durationMs, language}`.
* POST /transcribe — audio-file-in transcription used by the new TtsReader
audio-import feature. Multipart form (`audio`, optional
`language`) returns `{text, language, durationMs,
segments: [{startSeconds, endSeconds, text}]}` so the
UI can preview the transcript before piping it into
Quick Read or saving as a project.
Both endpoints share the same WhisperModel instance (loaded once at startup).
Model is pinned by the WHISPER_MODEL env var (defaults to base.en) and cached
under WHISPER_CACHE_DIR (defaults to /models, backed by a PVC in K8s).
Health: GET /health → {status: ok, model, device, computeType}.
"""
from __future__ import annotations
import io
import logging
import os
import time
from contextlib import asynccontextmanager
from typing import Optional
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import JSONResponse
from faster_whisper import WhisperModel
LOG = logging.getLogger("speech_align")
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
MODEL_NAME = os.environ.get("WHISPER_MODEL", "Systran/faster-whisper-base.en")
DEVICE = os.environ.get("WHISPER_DEVICE", "cpu")
COMPUTE_TYPE = os.environ.get("WHISPER_COMPUTE_TYPE", "int8")
CACHE_DIR = os.environ.get("WHISPER_CACHE_DIR", "/models")
MAX_BYTES = int(os.environ.get("MAX_AUDIO_BYTES", str(50 * 1024 * 1024))) # 50 MB
DEFAULT_LANGUAGE = os.environ.get("DEFAULT_LANGUAGE", "en")
_state: dict[str, object] = {}
@asynccontextmanager
async def lifespan(_app: FastAPI):
LOG.info("Loading faster-whisper model %s (device=%s compute=%s cache=%s)", MODEL_NAME, DEVICE, COMPUTE_TYPE, CACHE_DIR)
started = time.time()
model = WhisperModel(MODEL_NAME, device=DEVICE, compute_type=COMPUTE_TYPE, download_root=CACHE_DIR)
_state["model"] = model
LOG.info("Model loaded in %.2fs", time.time() - started)
yield
_state.clear()
app = FastAPI(title="FlowerCore speech-align", version="1.0.0", lifespan=lifespan)
def _get_model() -> WhisperModel:
model = _state.get("model")
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded yet")
return model # type: ignore[return-value]
async def _read_upload(upload: UploadFile) -> bytes:
payload = await upload.read()
if not payload:
raise HTTPException(status_code=400, detail="audio is empty")
if len(payload) > MAX_BYTES:
raise HTTPException(
status_code=413,
detail=f"audio exceeds {MAX_BYTES} byte limit ({len(payload)} bytes received)",
)
return payload
def _normalize_language(value: Optional[str]) -> Optional[str]:
if not value or not value.strip():
return DEFAULT_LANGUAGE
return value.strip().lower()
def _transcribe_bytes(audio_bytes: bytes, language: Optional[str], word_timestamps: bool):
model = _get_model()
started = time.time()
segments_iter, info = model.transcribe(
io.BytesIO(audio_bytes),
language=language,
word_timestamps=word_timestamps,
beam_size=1,
vad_filter=True,
)
segments = list(segments_iter)
elapsed_ms = int((time.time() - started) * 1000)
return segments, info, elapsed_ms
@app.get("/health")
def health():
return {
"status": "ok" if _state.get("model") is not None else "loading",
"model": MODEL_NAME,
"device": DEVICE,
"computeType": COMPUTE_TYPE,
"defaultLanguage": DEFAULT_LANGUAGE,
"maxBytes": MAX_BYTES,
}
@app.post("/align")
async def align(audio: UploadFile = File(...), language: str = Form(DEFAULT_LANGUAGE)):
"""fc-align contract — used by FlowerCore.Shared.Speech.FasterWhisperAlignmentClient."""
payload = await _read_upload(audio)
lang = _normalize_language(language)
segments, info, elapsed_ms = _transcribe_bytes(payload, lang, word_timestamps=True)
text_parts: list[str] = []
words: list[dict] = []
for segment in segments:
text_parts.append(segment.text.strip())
for word in (segment.words or []):
words.append({
"word": word.word.strip(),
"startSeconds": float(word.start or 0.0),
"endSeconds": float(word.end or 0.0),
"confidence": float(getattr(word, "probability", 0.0) or 0.0),
})
duration_ms = int((info.duration or 0.0) * 1000)
return JSONResponse({
"text": " ".join(p for p in text_parts if p).strip(),
"words": words,
"durationMs": duration_ms,
"language": info.language or lang,
"elapsedMs": elapsed_ms,
})
@app.post("/transcribe")
async def transcribe(audio: UploadFile = File(...), language: Optional[str] = Form(None)):
"""Audio-in transcription contract — used by the new TtsReader audio-import feature.
Returns full segments (no per-word timestamps) so the UI can preview the
transcript before piping it into Quick Read or saving as a project.
"""
payload = await _read_upload(audio)
lang = _normalize_language(language)
segments, info, elapsed_ms = _transcribe_bytes(payload, lang, word_timestamps=False)
out_segments = [
{
"startSeconds": float(segment.start or 0.0),
"endSeconds": float(segment.end or 0.0),
"text": segment.text.strip(),
}
for segment in segments
]
return JSONResponse({
"text": " ".join(s["text"] for s in out_segments if s["text"]).strip(),
"segments": out_segments,
"language": info.language or lang,
"durationMs": int((info.duration or 0.0) * 1000),
"elapsedMs": elapsed_ms,
})