Files
bluejay-infra/scripts/check-pfsense-dns.py
Andrew Stoltz 5ccf055465 check-pfsense-dns: add live-cluster scan
Extends the pre-merge DNS gate to (optionally) scan live-cluster
Certificates + IngressRoutes via kubectl. Closes the coverage hole
where a service's IngressRoute gets deployed from its own repo (not
from bluejay-infra/apps/) and the manifests-only scan misses it —
fc-retail/retail-web-tls stuck Issuing for 15h on a missing pfSense
Unbound override was exactly this class of bug.

Auto mode: if kubectl is on PATH and usable, live-scan runs silently.
--live  forces it (and errors out if kubectl can't reach the cluster).
--no-live skips live entirely (CI path with no cluster access).

Immediate live-scan finding on 2026-04-23: 10 orphan *.iamworkin.lan
IngressRoutes from failed e2e / codex / smoke / deleteproof test runs
in fc-php + fc-tenant-default (2026-04-16/17). None have DNS overrides
so their Certificates have been failing to issue for 7 days — the new
CertManagerCertificateNotReady alert will catch them too. Cleanup
(delete abandoned IngressRoutes + Certificates + CertificateRequests)
is a separate task; this check now surfaces them.
2026-04-23 15:51:19 -05:00

235 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""
check-pfsense-dns.py
Fails if any *.iamworkin.lan hostname referenced by a cert-manager Certificate
`spec.dnsNames` or a Traefik IngressRoute `Host(...)` match rule does NOT
resolve via the system DNS resolver (pfSense Unbound at 10.0.56.1 on this LAN).
Two sources are scanned:
1. apps/*/*.yaml in this bluejay-infra checkout — the pre-merge gate.
2. Live-cluster Certificates + IngressRoutes (opt-in with --live, or auto when
kubectl is on PATH AND kubeconfig is usable). This catches hostnames that
exist in the running cluster but aren't (yet) tracked in bluejay-infra —
e.g. services deployed via their own repo's deploy script. Retail.Web on
2026-04-23 was stuck Issuing for 15h because of exactly this gap.
Run from anywhere that uses pfSense as a resolver (LAN hosts, noc1,
BLUEJAY-WS):
python scripts/check-pfsense-dns.py # auto live scan if kubectl works
python scripts/check-pfsense-dns.py --live # require live scan
python scripts/check-pfsense-dns.py --no-live # manifests only (CI default)
Exit code 0: all referenced hosts resolve. 1: at least one doesn't.
This is intentionally narrow: it only flags hostnames that cert-manager will
actually try to validate via HTTP-01, or that Traefik will route. IRC
server-link names, Docker image tags, comments, etc. are ignored.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import shutil
import socket
import subprocess
import sys
from pathlib import Path
try:
import yaml # PyYAML
except ImportError:
sys.exit("PyYAML required: pip install pyyaml")
REPO_ROOT = Path(__file__).resolve().parent.parent
APPS_DIR = REPO_ROOT / "apps"
HOST_RE = re.compile(r"Host\(`([^`]+)`\)")
LIVE_SOURCE = "live-cluster"
def extract_hosts_from_doc(doc: dict) -> set[str]:
"""Pull iamworkin.lan hostnames from a single K8s manifest doc."""
out: set[str] = set()
if not isinstance(doc, dict):
return out
kind = doc.get("kind", "")
spec = doc.get("spec") or {}
if kind == "Certificate":
for name in spec.get("dnsNames", []) or []:
if isinstance(name, str) and name.endswith(".iamworkin.lan"):
out.add(name)
elif kind == "IngressRoute":
for route in spec.get("routes", []) or []:
match = route.get("match", "") if isinstance(route, dict) else ""
for h in HOST_RE.findall(match):
if h.endswith(".iamworkin.lan"):
out.add(h)
return out
def collect_hosts_from_manifests() -> dict[str, list[str]]:
"""hostname -> [list of manifest files that referenced it]."""
index: dict[str, list[str]] = {}
for path in sorted(APPS_DIR.rglob("*.yaml")):
try:
with path.open("r", encoding="utf-8") as f:
for doc in yaml.safe_load_all(f):
for host in extract_hosts_from_doc(doc):
index.setdefault(host, []).append(str(path.relative_to(REPO_ROOT)))
except yaml.YAMLError as e:
print(f"warn: could not parse {path}: {e}", file=sys.stderr)
return index
def _kubectl_json(args: list[str]) -> dict | None:
"""Run `kubectl ... -o json` and return the parsed result, or None on failure."""
try:
r = subprocess.run(
["kubectl", *args, "-o", "json"],
capture_output=True,
text=True,
timeout=20,
)
except (FileNotFoundError, subprocess.TimeoutExpired):
return None
if r.returncode != 0:
return None
try:
return json.loads(r.stdout)
except json.JSONDecodeError:
return None
def collect_hosts_from_cluster() -> tuple[dict[str, list[str]], bool]:
"""
Scan live cluster. Returns (host_index, ok).
ok=False means kubectl wasn't usable; the caller decides whether that's
fatal (--live) or just a warning (auto mode).
"""
if not shutil.which("kubectl"):
return {}, False
index: dict[str, list[str]] = {}
# Certificates (cert-manager.io/v1) — spec.dnsNames
certs = _kubectl_json(["get", "certificate", "-A"])
if certs is None:
return {}, False
for item in certs.get("items", []):
meta = item.get("metadata", {})
ns = meta.get("namespace", "?")
name = meta.get("name", "?")
ref = f"{LIVE_SOURCE} Certificate {ns}/{name}"
for dn in (item.get("spec", {}) or {}).get("dnsNames", []) or []:
if isinstance(dn, str) and dn.endswith(".iamworkin.lan"):
index.setdefault(dn, []).append(ref)
# IngressRoutes (traefik.io/v1alpha1) — spec.routes[].match Host(...)
# The CRD may or may not be installed. Silent skip when it isn't.
irs = _kubectl_json(["get", "ingressroute", "-A"])
if irs is not None:
for item in irs.get("items", []):
meta = item.get("metadata", {})
ns = meta.get("namespace", "?")
name = meta.get("name", "?")
ref = f"{LIVE_SOURCE} IngressRoute {ns}/{name}"
for route in (item.get("spec", {}) or {}).get("routes", []) or []:
match = route.get("match", "") if isinstance(route, dict) else ""
for h in HOST_RE.findall(match):
if h.endswith(".iamworkin.lan"):
index.setdefault(h, []).append(ref)
return index, True
def resolves(host: str) -> str | None:
try:
return socket.gethostbyname(host)
except OSError:
return None
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__.splitlines()[1] if __doc__ else None)
parser.add_argument(
"--live",
dest="live",
action="store_true",
default=None,
help="Require a live-cluster scan (fail if kubectl unreachable).",
)
parser.add_argument(
"--no-live",
dest="live",
action="store_false",
help="Skip live-cluster scan (manifests only).",
)
args = parser.parse_args()
hosts = collect_hosts_from_manifests()
live_requested = args.live is True
live_auto = args.live is None # neither --live nor --no-live
if live_requested or live_auto:
live_hosts, live_ok = collect_hosts_from_cluster()
if live_requested and not live_ok:
print("ERROR: --live requested but kubectl is not available or auth failed.", file=sys.stderr)
return 2
if live_ok:
before = len(hosts)
for host, refs in live_hosts.items():
hosts.setdefault(host, []).extend(refs)
new_hosts = len(hosts) - before
print(f"(live scan: {len(live_hosts)} cluster host(s); {new_hosts} not covered by manifests)")
elif live_auto:
print("(kubectl not reachable — skipping live scan; run from a workstation with cluster access to catch retail-style drift)")
if not hosts:
print(f"No iamworkin.lan hostnames found in manifests or cluster — nothing to check.")
return 0
failed: list[tuple[str, list[str]]] = []
for host in sorted(hosts):
ip = resolves(host)
if ip:
print(f"OK {host:<45} -> {ip}")
else:
print(f"FAIL {host:<45} (no pfSense Unbound override)")
failed.append((host, hosts[host]))
if failed:
print()
print(f"ERROR: {len(failed)} host(s) referenced but not in pfSense Unbound.")
for host, refs in failed:
print(f" {host}")
for ref in sorted(set(refs)):
print(f" via: {ref}")
print()
print("Add them before merging — see README.md step 1.")
print()
print("From FlowerCore.Notes:")
print(" # edit HOSTS list in scripts/pfsense-add-dns-overrides.py")
print(" export PFSENSE_PASS=$(get_cred 'pfSense Admin')")
print(" python scripts/pfsense-add-dns-overrides.py")
return 1
print()
print(f"All {len(hosts)} iamworkin.lan host(s) resolve via pfSense. Safe to deploy.")
return 0
if __name__ == "__main__":
sys.exit(main())