From bacac067cff81808f9c6ae676c8fb0f7f355c8d6 Mon Sep 17 00:00:00 2001 From: Andrew Stoltz Date: Tue, 19 May 2026 09:55:57 -0500 Subject: [PATCH] monitoring(irc-notify): hourly digest batching for thermal printer The thermal printer drained overnight (2026-05-18/19) because the old notify.py POSTed one print job per Grafana webhook fire. With 9 concurrently-firing alerts (zabbix-postgres + fc-devicemgmt + brochure + PrintPaperRollLow), every evaluation cycle stamped fresh CUPS jobs onto the queue until the operator physically powered the printer off. This refactor: - Adds env-var config: THERMAL_PRINT_ENABLED (master kill switch), BATCH_INTERVAL_MIN (default 60), BATCH_MAX_PENDING (default 50). - IRC delivery stays per-event (operator wants the live stream). - Thermal routing now: * critical/disaster/page severity OR alert_channel=thermal_print_immediate -> print immediately * alert_channel=thermal_print -> enqueue into hourly digest * RESOLVED -> remove from digest buffer (no resolution-spam prints) * else -> IRC only, no thermal - Background digest_loop thread flushes the buffer hourly (or sooner if buffer hits BATCH_MAX_PENDING). Digest payload is a single Print.Web /api/print/alert POST listing distinct alertnames + per-rule target counts. - New POST /flush endpoint (manual operator force-flush; useful for testing without waiting an hour). - GET / returns config + buffer depth + per-stat counters for observability. Net effect: max 1 thermal print per BATCH_INTERVAL_MIN for batched warnings, plus immediate prints for criticals. Closes the 2026-05-18/19 alert-storm incident. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/monitoring/noc-monitoring.yaml | 246 ++++++++++++++++++++++------ 1 file changed, 194 insertions(+), 52 deletions(-) diff --git a/apps/monitoring/noc-monitoring.yaml b/apps/monitoring/noc-monitoring.yaml index ea1b175..447f612 100644 --- a/apps/monitoring/noc-monitoring.yaml +++ b/apps/monitoring/noc-monitoring.yaml @@ -1273,24 +1273,55 @@ metadata: data: notify.py: | #!/usr/bin/env python3 - """HTTP->IRC alert relay with thermal printer forwarding for Grafana webhooks. - Listens on :9119, posts to #alerts on UnrealIRCd via raw IRC protocol. - Alerts tagged alert_channel=thermal_print also POST to Print.Web /api/print/alert. + """HTTP->IRC alert relay with thermal-printer DIGEST forwarding. + + Listens on :9119, posts to #alerts on UnrealIRCd, forwards to Print.Web + /api/print/alert. Thermal printing is BATCHED into hourly digests by + default so the printer no longer spam-fires per Grafana webhook. + + Routing (per Grafana webhook alert): + - IRC: always per-event (operator likes the stream) + - Thermal printer: + * severity in {critical,disaster,page} OR + label alert_channel=thermal_print_immediate -> print NOW + * label alert_channel=thermal_print -> enqueue into hourly digest + * everything else -> IRC only + - RESOLVED webhooks remove the alert from the digest buffer + + Env vars (defaults preserve old behavior on first deploy): + THERMAL_PRINT_ENABLED default "true" - master kill switch + BATCH_INTERVAL_MIN default "60" - minutes between digest prints + BATCH_MAX_PENDING default "50" - force-flush threshold + + HTTP surface: + POST / - Grafana webhook entry + POST /flush - manual digest flush (idempotent) + GET / - status + config + buffer depth + stats """ - import json, socket, sys, time + import json, os, socket, sys, threading, time + from collections import defaultdict + from datetime import datetime, timezone from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.request import Request, urlopen - from urllib.error import URLError - IRC_HOST = "unrealircd.irc.svc" # short name: CoreDNS ndots:5 + iamworkin.lan template hijacks full .cluster.local (see memory) - IRC_PORT = 6667 - IRC_NICK = "grafana-bot" - IRC_CHANNEL = "#alerts" - PRINT_WEB_URL = "http://10.0.57.16:5200/api/print/alert" - PRINT_ENABLED = True + THERMAL_PRINT_ENABLED = os.environ.get("THERMAL_PRINT_ENABLED", "true").lower() == "true" + BATCH_INTERVAL_MIN = int(os.environ.get("BATCH_INTERVAL_MIN", "60")) + BATCH_MAX_PENDING = int(os.environ.get("BATCH_MAX_PENDING", "50")) + + IRC_HOST = os.environ.get("IRC_HOST", "unrealircd.irc.svc") + IRC_PORT = int(os.environ.get("IRC_PORT", "6667")) + IRC_NICK = os.environ.get("IRC_NICK", "grafana-bot") + IRC_CHANNEL = os.environ.get("IRC_CHANNEL", "#alerts") + PRINT_WEB_URL = os.environ.get("PRINT_WEB_URL", "http://10.0.57.16:5200/api/print/alert") + + _buffer_lock = threading.Lock() + _buffer = {} # fingerprint -> {"alert": dict, "first_seen": float, "last_seen": float} + _last_flush_time = time.time() + _stats = {"webhooks_received": 0, "irc_sent": 0, "print_immediate": 0, + "digest_flushed": 0, "buffer_dedup": 0, "buffer_added": 0, + "buffer_resolved": 0, "started_at": time.time()} def send_irc(message): - """Connect, handle PING, join, send, quit.""" try: sock = socket.create_connection((IRC_HOST, IRC_PORT), timeout=15) sock.sendall(f"NICK {IRC_NICK}\r\n".encode()) @@ -1323,52 +1354,133 @@ data: time.sleep(0.5) sock.sendall(b"QUIT :alert delivered\r\n") sock.close() + _stats["irc_sent"] += 1 return True except Exception as e: print(f"[irc-notify] IRC send failed: {e}", file=sys.stderr) return False - def send_thermal_print(alert): - if not PRINT_ENABLED: return - labels = alert.get("labels", {}) - annotations = alert.get("annotations", {}) - status = alert.get("status", "firing").upper() - summary = annotations.get("summary", "") - description = annotations.get("description", "") - runbook = annotations.get("runbook", "") - # Build a useful message: summary + description + runbook steps - parts = [] - if summary: parts.append(summary) - if description and description != summary: parts.append(description) - if runbook: parts.append("STEPS: " + runbook) - message = " | ".join(parts) if parts else labels.get("alertname", "Unknown alert") - payload = { - "title": labels.get("alertname", "Unknown"), - "severity": labels.get("severity", "warning").capitalize(), - "host": labels.get("instance", labels.get("host", "unknown")), - "message": message, - "eventId": alert.get("fingerprint", ""), - "source": "Grafana", - "status": "RESOLVED" if status == "RESOLVED" else "PROBLEM", - "acknowledged": False - } + def post_thermal(payload, kind): + if not THERMAL_PRINT_ENABLED: + print(f"[irc-notify] thermal disabled; skip {kind} ({payload.get('title','?')[:40]})", file=sys.stderr) + return False try: req = Request(PRINT_WEB_URL, data=json.dumps(payload).encode("utf-8"), headers={"Content-Type": "application/json"}, method="POST") resp = urlopen(req, timeout=10) - print(f"[irc-notify] Thermal print sent: {resp.read().decode()}", file=sys.stderr) + if kind == "immediate": _stats["print_immediate"] += 1 + print(f"[irc-notify] thermal {kind} sent: {payload.get('title','?')[:50]}", file=sys.stderr) + return True except Exception as e: - print(f"[irc-notify] Thermal print failed: {e}", file=sys.stderr) + print(f"[irc-notify] thermal {kind} failed: {e}", file=sys.stderr) + return False - def should_print(alert): + def fingerprint_of(alert): + fp = alert.get("fingerprint", "") + if fp: return fp labels = alert.get("labels", {}) - if labels.get("alert_channel") == "thermal_print": return True - if labels.get("severity", "").lower() in ("critical", "disaster"): return True - if alert.get("status", "").upper() == "RESOLVED": return False - return False + target = labels.get("pod") or labels.get("instance") or labels.get("deployment") or labels.get("statefulset") or labels.get("namespace") or "" + return f"{labels.get('alertname','?')}/{labels.get('namespace','')}/{target}" + + def is_critical(alert): + return alert.get("labels", {}).get("severity", "").lower() in ("critical", "disaster", "page") + + def is_immediate_label(alert): + return alert.get("labels", {}).get("alert_channel") == "thermal_print_immediate" + + def is_batched_label(alert): + return alert.get("labels", {}).get("alert_channel") == "thermal_print" + + def add_to_digest(alert): + if not THERMAL_PRINT_ENABLED: return + fp = fingerprint_of(alert) + status = alert.get("status", "firing").lower() + with _buffer_lock: + if status == "resolved": + if fp in _buffer: + del _buffer[fp] + _stats["buffer_resolved"] += 1 + return + if fp in _buffer: + _buffer[fp]["last_seen"] = time.time() + _buffer[fp]["alert"] = alert + _stats["buffer_dedup"] += 1 + return + _buffer[fp] = {"alert": alert, "first_seen": time.time(), "last_seen": time.time()} + _stats["buffer_added"] += 1 + + def build_digest_payload(): + with _buffer_lock: + items = list(_buffer.values()) + if not items: return None + by_name = defaultdict(list) + for item in items: + labels = item["alert"].get("labels", {}) + by_name[labels.get("alertname", "Unknown")].append(item) + lines = [] + for name, group in sorted(by_name.items()): + targets = [] + for it in group[:5]: + labels = it["alert"].get("labels", {}) + t = (labels.get("pod") or labels.get("instance") or labels.get("deployment") + or labels.get("statefulset") or labels.get("namespace") or "?") + targets.append(t) + more = f" (+{len(group)-5})" if len(group) > 5 else "" + sevs = sorted({it["alert"].get("labels", {}).get("severity", "warning") for it in group}) + lines.append(f"[{'/'.join(sevs)}] {name} x{len(group)}: {', '.join(targets)}{more}") + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + title = f"Alert digest: {len(items)} firing" + body = "\n".join([ + f"=== {title} ===", + f"as of {now}", + "", + *lines, + "", + "Stream: #alerts (IRC) | Triage: grafana-noc1.iamworkin.lan", + "Force-flush: POST irc-notify.monitoring.svc:9119/flush", + ]) + return {"title": title, "severity": "Warning", "host": "monitoring", + "message": body, "eventId": f"digest-{int(time.time())}", + "source": "Grafana digest", "status": "PROBLEM", "acknowledged": False} + + def flush_digest(): + payload = build_digest_payload() + if payload is None: + print("[irc-notify] flush: buffer empty, no digest sent", file=sys.stderr) + return False + sent = post_thermal(payload, "digest") + with _buffer_lock: + _buffer.clear() + if sent: _stats["digest_flushed"] += 1 + return sent + + def digest_loop(): + global _last_flush_time + while True: + try: + now = time.time() + elapsed = now - _last_flush_time + if elapsed >= BATCH_INTERVAL_MIN * 60: + print(f"[irc-notify] digest tick: interval reached ({BATCH_INTERVAL_MIN}m); buffer={len(_buffer)}", file=sys.stderr) + flush_digest() + _last_flush_time = now + elif len(_buffer) >= BATCH_MAX_PENDING: + print(f"[irc-notify] digest tick: buffer full ({len(_buffer)}); force flush", file=sys.stderr) + flush_digest() + _last_flush_time = now + time.sleep(15) + except Exception as e: + print(f"[irc-notify] digest loop error: {e}", file=sys.stderr) + time.sleep(60) class Handler(BaseHTTPRequestHandler): def do_POST(self): + if self.path == "/flush": + ok = flush_digest() + self.send_response(200); self.send_header("Content-Type", "application/json"); self.end_headers() + self.wfile.write(json.dumps({"flushed": ok, "buffer_after": len(_buffer)}).encode()) + return + _stats["webhooks_received"] += 1 length = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(length)) if length else {} for alert in body.get("alerts", []): @@ -1383,22 +1495,52 @@ data: msg = f"{icon}{sev_tag} {name}: {summary}" if desc: msg += f"\n {desc}" send_irc(msg) - if should_print(alert): send_thermal_print(alert) - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() + # Thermal routing + if status == "RESOLVED": + add_to_digest(alert) # removes from buffer + continue + if is_critical(alert) or is_immediate_label(alert): + runbook = alert.get("annotations", {}).get("runbook", "") + parts = [summary] + if desc and desc != summary: parts.append(desc) + if runbook: parts.append("STEPS: " + runbook) + pl = {"title": name, "severity": (severity or "warning").capitalize(), + "host": labels.get("instance", labels.get("pod", labels.get("namespace", "unknown"))), + "message": " | ".join(parts), "eventId": alert.get("fingerprint", ""), + "source": "Grafana (immediate)", "status": "PROBLEM", "acknowledged": False} + post_thermal(pl, "immediate") + elif is_batched_label(alert): + add_to_digest(alert) + # else: IRC-only + self.send_response(200); self.send_header("Content-Type", "application/json"); self.end_headers() self.wfile.write(b'{"status":"ok"}') + def do_GET(self): - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - self.wfile.write(json.dumps({"service":"irc-notify","thermal_print":PRINT_ENABLED}).encode()) + self.send_response(200); self.send_header("Content-Type", "application/json"); self.end_headers() + with _buffer_lock: + alertnames = sorted({it["alert"].get("labels", {}).get("alertname", "?") for it in _buffer.values()}) + depth = len(_buffer) + info = { + "service": "irc-notify", + "config": {"thermal_print_enabled": THERMAL_PRINT_ENABLED, + "batch_interval_min": BATCH_INTERVAL_MIN, + "batch_max_pending": BATCH_MAX_PENDING, + "irc_target": f"{IRC_HOST}:{IRC_PORT} {IRC_CHANNEL}", + "print_web_url": PRINT_WEB_URL}, + "buffer": {"depth": depth, "alertnames": alertnames, + "seconds_since_last_flush": int(time.time() - _last_flush_time), + "seconds_until_next_flush": max(0, int(BATCH_INTERVAL_MIN*60 - (time.time() - _last_flush_time)))}, + "stats": _stats, + } + self.wfile.write(json.dumps(info, indent=2).encode()) + def log_message(self, format, *args): print(f"[irc-notify] {args[0]}", file=sys.stderr) if __name__ == "__main__": + threading.Thread(target=digest_loop, daemon=True).start() server = HTTPServer(("0.0.0.0", 9119), Handler) - print(f"IRC alert relay :9119 -> {IRC_HOST}:{IRC_PORT} {IRC_CHANNEL} (thermal: {PRINT_ENABLED})") + print(f"[irc-notify] :9119 -> IRC {IRC_HOST}:{IRC_PORT} {IRC_CHANNEL} | thermal={'ON' if THERMAL_PRINT_ENABLED else 'OFF'} | digest={BATCH_INTERVAL_MIN}m max={BATCH_MAX_PENDING}", file=sys.stderr) server.serve_forever() # =============================================================================