#!/usr/bin/env python3 """ check-pfsense-dns.py Fails if any *.iamworkin.lan hostname referenced by a cert-manager Certificate `spec.dnsNames` or a Traefik IngressRoute `Host(...)` match rule does NOT resolve via the system DNS resolver (pfSense Unbound at 10.0.56.1 on this LAN). Two sources are scanned: 1. apps/*/*.yaml in this bluejay-infra checkout — the pre-merge gate. 2. Live-cluster Certificates + IngressRoutes (opt-in with --live, or auto when kubectl is on PATH AND kubeconfig is usable). This catches hostnames that exist in the running cluster but aren't (yet) tracked in bluejay-infra — e.g. services deployed via their own repo's deploy script. Retail.Web on 2026-04-23 was stuck Issuing for 15h because of exactly this gap. Run from anywhere that uses pfSense as a resolver (LAN hosts, noc1, BLUEJAY-WS): python scripts/check-pfsense-dns.py # auto live scan if kubectl works python scripts/check-pfsense-dns.py --live # require live scan python scripts/check-pfsense-dns.py --no-live # manifests only (CI default) Exit code 0: all referenced hosts resolve. 1: at least one doesn't. This is intentionally narrow: it only flags hostnames that cert-manager will actually try to validate via HTTP-01, or that Traefik will route. IRC server-link names, Docker image tags, comments, etc. are ignored. """ from __future__ import annotations import argparse import json import os import re import shutil import socket import subprocess import sys from pathlib import Path try: import yaml # PyYAML except ImportError: sys.exit("PyYAML required: pip install pyyaml") REPO_ROOT = Path(__file__).resolve().parent.parent APPS_DIR = REPO_ROOT / "apps" HOST_RE = re.compile(r"Host\(`([^`]+)`\)") LIVE_SOURCE = "live-cluster" def extract_hosts_from_doc(doc: dict) -> set[str]: """Pull iamworkin.lan hostnames from a single K8s manifest doc.""" out: set[str] = set() if not isinstance(doc, dict): return out kind = doc.get("kind", "") spec = doc.get("spec") or {} if kind == "Certificate": for name in spec.get("dnsNames", []) or []: if isinstance(name, str) and name.endswith(".iamworkin.lan"): out.add(name) elif kind == "IngressRoute": for route in spec.get("routes", []) or []: match = route.get("match", "") if isinstance(route, dict) else "" for h in HOST_RE.findall(match): if h.endswith(".iamworkin.lan"): out.add(h) return out def collect_hosts_from_manifests() -> dict[str, list[str]]: """hostname -> [list of manifest files that referenced it].""" index: dict[str, list[str]] = {} for path in sorted(APPS_DIR.rglob("*.yaml")): try: with path.open("r", encoding="utf-8") as f: for doc in yaml.safe_load_all(f): for host in extract_hosts_from_doc(doc): index.setdefault(host, []).append(str(path.relative_to(REPO_ROOT))) except yaml.YAMLError as e: print(f"warn: could not parse {path}: {e}", file=sys.stderr) return index def _kubectl_json(args: list[str]) -> dict | None: """Run `kubectl ... -o json` and return the parsed result, or None on failure.""" try: r = subprocess.run( ["kubectl", *args, "-o", "json"], capture_output=True, text=True, timeout=20, ) except (FileNotFoundError, subprocess.TimeoutExpired): return None if r.returncode != 0: return None try: return json.loads(r.stdout) except json.JSONDecodeError: return None def collect_hosts_from_cluster() -> tuple[dict[str, list[str]], bool]: """ Scan live cluster. Returns (host_index, ok). ok=False means kubectl wasn't usable; the caller decides whether that's fatal (--live) or just a warning (auto mode). """ if not shutil.which("kubectl"): return {}, False index: dict[str, list[str]] = {} # Certificates (cert-manager.io/v1) — spec.dnsNames certs = _kubectl_json(["get", "certificate", "-A"]) if certs is None: return {}, False for item in certs.get("items", []): meta = item.get("metadata", {}) ns = meta.get("namespace", "?") name = meta.get("name", "?") ref = f"{LIVE_SOURCE} Certificate {ns}/{name}" for dn in (item.get("spec", {}) or {}).get("dnsNames", []) or []: if isinstance(dn, str) and dn.endswith(".iamworkin.lan"): index.setdefault(dn, []).append(ref) # IngressRoutes (traefik.io/v1alpha1) — spec.routes[].match Host(...) # The CRD may or may not be installed. Silent skip when it isn't. irs = _kubectl_json(["get", "ingressroute", "-A"]) if irs is not None: for item in irs.get("items", []): meta = item.get("metadata", {}) ns = meta.get("namespace", "?") name = meta.get("name", "?") ref = f"{LIVE_SOURCE} IngressRoute {ns}/{name}" for route in (item.get("spec", {}) or {}).get("routes", []) or []: match = route.get("match", "") if isinstance(route, dict) else "" for h in HOST_RE.findall(match): if h.endswith(".iamworkin.lan"): index.setdefault(h, []).append(ref) return index, True def resolves(host: str) -> str | None: try: return socket.gethostbyname(host) except OSError: return None def main() -> int: parser = argparse.ArgumentParser(description=__doc__.splitlines()[1] if __doc__ else None) parser.add_argument( "--live", dest="live", action="store_true", default=None, help="Require a live-cluster scan (fail if kubectl unreachable).", ) parser.add_argument( "--no-live", dest="live", action="store_false", help="Skip live-cluster scan (manifests only).", ) args = parser.parse_args() hosts = collect_hosts_from_manifests() live_requested = args.live is True live_auto = args.live is None # neither --live nor --no-live if live_requested or live_auto: live_hosts, live_ok = collect_hosts_from_cluster() if live_requested and not live_ok: print("ERROR: --live requested but kubectl is not available or auth failed.", file=sys.stderr) return 2 if live_ok: before = len(hosts) for host, refs in live_hosts.items(): hosts.setdefault(host, []).extend(refs) new_hosts = len(hosts) - before print(f"(live scan: {len(live_hosts)} cluster host(s); {new_hosts} not covered by manifests)") elif live_auto: print("(kubectl not reachable — skipping live scan; run from a workstation with cluster access to catch retail-style drift)") if not hosts: print(f"No iamworkin.lan hostnames found in manifests or cluster — nothing to check.") return 0 failed: list[tuple[str, list[str]]] = [] for host in sorted(hosts): ip = resolves(host) if ip: print(f"OK {host:<45} -> {ip}") else: print(f"FAIL {host:<45} (no pfSense Unbound override)") failed.append((host, hosts[host])) if failed: print() print(f"ERROR: {len(failed)} host(s) referenced but not in pfSense Unbound.") for host, refs in failed: print(f" {host}") for ref in sorted(set(refs)): print(f" via: {ref}") print() print("Add them before merging — see README.md step 1.") print() print("From FlowerCore.Notes:") print(" # edit HOSTS list in scripts/pfsense-add-dns-overrides.py") print(" export PFSENSE_PASS=$(get_cred 'pfSense Admin')") print(" python scripts/pfsense-add-dns-overrides.py") return 1 print() print(f"All {len(hosts)} iamworkin.lan host(s) resolve via pfSense. Safe to deploy.") return 0 if __name__ == "__main__": sys.exit(main())