monitoring(irc-notify): hourly digest batching for thermal printer

The thermal printer drained overnight (2026-05-18/19) because the old
notify.py POSTed one print job per Grafana webhook fire. With 9
concurrently-firing alerts (zabbix-postgres + fc-devicemgmt + brochure
+ PrintPaperRollLow), every evaluation cycle stamped fresh CUPS jobs
onto the queue until the operator physically powered the printer off.

This refactor:

- Adds env-var config: THERMAL_PRINT_ENABLED (master kill switch),
  BATCH_INTERVAL_MIN (default 60), BATCH_MAX_PENDING (default 50).
- IRC delivery stays per-event (operator wants the live stream).
- Thermal routing now:
  * critical/disaster/page severity OR alert_channel=thermal_print_immediate
    -> print immediately
  * alert_channel=thermal_print -> enqueue into hourly digest
  * RESOLVED -> remove from digest buffer (no resolution-spam prints)
  * else -> IRC only, no thermal
- Background digest_loop thread flushes the buffer hourly (or sooner
  if buffer hits BATCH_MAX_PENDING). Digest payload is a single
  Print.Web /api/print/alert POST listing distinct alertnames + per-rule
  target counts.
- New POST /flush endpoint (manual operator force-flush; useful for
  testing without waiting an hour).
- GET / returns config + buffer depth + per-stat counters for observability.

Net effect: max 1 thermal print per BATCH_INTERVAL_MIN for batched
warnings, plus immediate prints for criticals. Closes the 2026-05-18/19
alert-storm incident.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Andrew Stoltz
2026-05-19 09:55:57 -05:00
parent 914fed08d8
commit bacac067cf

View File

@@ -1273,24 +1273,55 @@ metadata:
data: data:
notify.py: | notify.py: |
#!/usr/bin/env python3 #!/usr/bin/env python3
"""HTTP->IRC alert relay with thermal printer forwarding for Grafana webhooks. """HTTP->IRC alert relay with thermal-printer DIGEST forwarding.
Listens on :9119, posts to #alerts on UnrealIRCd via raw IRC protocol.
Alerts tagged alert_channel=thermal_print also POST to Print.Web /api/print/alert. Listens on :9119, posts to #alerts on UnrealIRCd, forwards to Print.Web
/api/print/alert. Thermal printing is BATCHED into hourly digests by
default so the printer no longer spam-fires per Grafana webhook.
Routing (per Grafana webhook alert):
- IRC: always per-event (operator likes the stream)
- Thermal printer:
* severity in {critical,disaster,page} OR
label alert_channel=thermal_print_immediate -> print NOW
* label alert_channel=thermal_print -> enqueue into hourly digest
* everything else -> IRC only
- RESOLVED webhooks remove the alert from the digest buffer
Env vars (defaults preserve old behavior on first deploy):
THERMAL_PRINT_ENABLED default "true" - master kill switch
BATCH_INTERVAL_MIN default "60" - minutes between digest prints
BATCH_MAX_PENDING default "50" - force-flush threshold
HTTP surface:
POST / - Grafana webhook entry
POST /flush - manual digest flush (idempotent)
GET / - status + config + buffer depth + stats
""" """
import json, socket, sys, time import json, os, socket, sys, threading, time
from collections import defaultdict
from datetime import datetime, timezone
from http.server import HTTPServer, BaseHTTPRequestHandler from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
from urllib.error import URLError
IRC_HOST = "unrealircd.irc.svc" # short name: CoreDNS ndots:5 + iamworkin.lan template hijacks full .cluster.local (see memory) THERMAL_PRINT_ENABLED = os.environ.get("THERMAL_PRINT_ENABLED", "true").lower() == "true"
IRC_PORT = 6667 BATCH_INTERVAL_MIN = int(os.environ.get("BATCH_INTERVAL_MIN", "60"))
IRC_NICK = "grafana-bot" BATCH_MAX_PENDING = int(os.environ.get("BATCH_MAX_PENDING", "50"))
IRC_CHANNEL = "#alerts"
PRINT_WEB_URL = "http://10.0.57.16:5200/api/print/alert" IRC_HOST = os.environ.get("IRC_HOST", "unrealircd.irc.svc")
PRINT_ENABLED = True IRC_PORT = int(os.environ.get("IRC_PORT", "6667"))
IRC_NICK = os.environ.get("IRC_NICK", "grafana-bot")
IRC_CHANNEL = os.environ.get("IRC_CHANNEL", "#alerts")
PRINT_WEB_URL = os.environ.get("PRINT_WEB_URL", "http://10.0.57.16:5200/api/print/alert")
_buffer_lock = threading.Lock()
_buffer = {} # fingerprint -> {"alert": dict, "first_seen": float, "last_seen": float}
_last_flush_time = time.time()
_stats = {"webhooks_received": 0, "irc_sent": 0, "print_immediate": 0,
"digest_flushed": 0, "buffer_dedup": 0, "buffer_added": 0,
"buffer_resolved": 0, "started_at": time.time()}
def send_irc(message): def send_irc(message):
"""Connect, handle PING, join, send, quit."""
try: try:
sock = socket.create_connection((IRC_HOST, IRC_PORT), timeout=15) sock = socket.create_connection((IRC_HOST, IRC_PORT), timeout=15)
sock.sendall(f"NICK {IRC_NICK}\r\n".encode()) sock.sendall(f"NICK {IRC_NICK}\r\n".encode())
@@ -1323,52 +1354,133 @@ data:
time.sleep(0.5) time.sleep(0.5)
sock.sendall(b"QUIT :alert delivered\r\n") sock.sendall(b"QUIT :alert delivered\r\n")
sock.close() sock.close()
_stats["irc_sent"] += 1
return True return True
except Exception as e: except Exception as e:
print(f"[irc-notify] IRC send failed: {e}", file=sys.stderr) print(f"[irc-notify] IRC send failed: {e}", file=sys.stderr)
return False return False
def send_thermal_print(alert): def post_thermal(payload, kind):
if not PRINT_ENABLED: return if not THERMAL_PRINT_ENABLED:
labels = alert.get("labels", {}) print(f"[irc-notify] thermal disabled; skip {kind} ({payload.get('title','?')[:40]})", file=sys.stderr)
annotations = alert.get("annotations", {}) return False
status = alert.get("status", "firing").upper()
summary = annotations.get("summary", "")
description = annotations.get("description", "")
runbook = annotations.get("runbook", "")
# Build a useful message: summary + description + runbook steps
parts = []
if summary: parts.append(summary)
if description and description != summary: parts.append(description)
if runbook: parts.append("STEPS: " + runbook)
message = " | ".join(parts) if parts else labels.get("alertname", "Unknown alert")
payload = {
"title": labels.get("alertname", "Unknown"),
"severity": labels.get("severity", "warning").capitalize(),
"host": labels.get("instance", labels.get("host", "unknown")),
"message": message,
"eventId": alert.get("fingerprint", ""),
"source": "Grafana",
"status": "RESOLVED" if status == "RESOLVED" else "PROBLEM",
"acknowledged": False
}
try: try:
req = Request(PRINT_WEB_URL, data=json.dumps(payload).encode("utf-8"), req = Request(PRINT_WEB_URL, data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"}, method="POST") headers={"Content-Type": "application/json"}, method="POST")
resp = urlopen(req, timeout=10) resp = urlopen(req, timeout=10)
print(f"[irc-notify] Thermal print sent: {resp.read().decode()}", file=sys.stderr) if kind == "immediate": _stats["print_immediate"] += 1
print(f"[irc-notify] thermal {kind} sent: {payload.get('title','?')[:50]}", file=sys.stderr)
return True
except Exception as e: except Exception as e:
print(f"[irc-notify] Thermal print failed: {e}", file=sys.stderr) print(f"[irc-notify] thermal {kind} failed: {e}", file=sys.stderr)
return False
def should_print(alert): def fingerprint_of(alert):
fp = alert.get("fingerprint", "")
if fp: return fp
labels = alert.get("labels", {}) labels = alert.get("labels", {})
if labels.get("alert_channel") == "thermal_print": return True target = labels.get("pod") or labels.get("instance") or labels.get("deployment") or labels.get("statefulset") or labels.get("namespace") or ""
if labels.get("severity", "").lower() in ("critical", "disaster"): return True return f"{labels.get('alertname','?')}/{labels.get('namespace','')}/{target}"
if alert.get("status", "").upper() == "RESOLVED": return False
return False def is_critical(alert):
return alert.get("labels", {}).get("severity", "").lower() in ("critical", "disaster", "page")
def is_immediate_label(alert):
return alert.get("labels", {}).get("alert_channel") == "thermal_print_immediate"
def is_batched_label(alert):
return alert.get("labels", {}).get("alert_channel") == "thermal_print"
def add_to_digest(alert):
if not THERMAL_PRINT_ENABLED: return
fp = fingerprint_of(alert)
status = alert.get("status", "firing").lower()
with _buffer_lock:
if status == "resolved":
if fp in _buffer:
del _buffer[fp]
_stats["buffer_resolved"] += 1
return
if fp in _buffer:
_buffer[fp]["last_seen"] = time.time()
_buffer[fp]["alert"] = alert
_stats["buffer_dedup"] += 1
return
_buffer[fp] = {"alert": alert, "first_seen": time.time(), "last_seen": time.time()}
_stats["buffer_added"] += 1
def build_digest_payload():
with _buffer_lock:
items = list(_buffer.values())
if not items: return None
by_name = defaultdict(list)
for item in items:
labels = item["alert"].get("labels", {})
by_name[labels.get("alertname", "Unknown")].append(item)
lines = []
for name, group in sorted(by_name.items()):
targets = []
for it in group[:5]:
labels = it["alert"].get("labels", {})
t = (labels.get("pod") or labels.get("instance") or labels.get("deployment")
or labels.get("statefulset") or labels.get("namespace") or "?")
targets.append(t)
more = f" (+{len(group)-5})" if len(group) > 5 else ""
sevs = sorted({it["alert"].get("labels", {}).get("severity", "warning") for it in group})
lines.append(f"[{'/'.join(sevs)}] {name} x{len(group)}: {', '.join(targets)}{more}")
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
title = f"Alert digest: {len(items)} firing"
body = "\n".join([
f"=== {title} ===",
f"as of {now}",
"",
*lines,
"",
"Stream: #alerts (IRC) | Triage: grafana-noc1.iamworkin.lan",
"Force-flush: POST irc-notify.monitoring.svc:9119/flush",
])
return {"title": title, "severity": "Warning", "host": "monitoring",
"message": body, "eventId": f"digest-{int(time.time())}",
"source": "Grafana digest", "status": "PROBLEM", "acknowledged": False}
def flush_digest():
payload = build_digest_payload()
if payload is None:
print("[irc-notify] flush: buffer empty, no digest sent", file=sys.stderr)
return False
sent = post_thermal(payload, "digest")
with _buffer_lock:
_buffer.clear()
if sent: _stats["digest_flushed"] += 1
return sent
def digest_loop():
global _last_flush_time
while True:
try:
now = time.time()
elapsed = now - _last_flush_time
if elapsed >= BATCH_INTERVAL_MIN * 60:
print(f"[irc-notify] digest tick: interval reached ({BATCH_INTERVAL_MIN}m); buffer={len(_buffer)}", file=sys.stderr)
flush_digest()
_last_flush_time = now
elif len(_buffer) >= BATCH_MAX_PENDING:
print(f"[irc-notify] digest tick: buffer full ({len(_buffer)}); force flush", file=sys.stderr)
flush_digest()
_last_flush_time = now
time.sleep(15)
except Exception as e:
print(f"[irc-notify] digest loop error: {e}", file=sys.stderr)
time.sleep(60)
class Handler(BaseHTTPRequestHandler): class Handler(BaseHTTPRequestHandler):
def do_POST(self): def do_POST(self):
if self.path == "/flush":
ok = flush_digest()
self.send_response(200); self.send_header("Content-Type", "application/json"); self.end_headers()
self.wfile.write(json.dumps({"flushed": ok, "buffer_after": len(_buffer)}).encode())
return
_stats["webhooks_received"] += 1
length = int(self.headers.get("Content-Length", 0)) length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length)) if length else {} body = json.loads(self.rfile.read(length)) if length else {}
for alert in body.get("alerts", []): for alert in body.get("alerts", []):
@@ -1383,22 +1495,52 @@ data:
msg = f"{icon}{sev_tag} {name}: {summary}" msg = f"{icon}{sev_tag} {name}: {summary}"
if desc: msg += f"\n {desc}" if desc: msg += f"\n {desc}"
send_irc(msg) send_irc(msg)
if should_print(alert): send_thermal_print(alert) # Thermal routing
self.send_response(200) if status == "RESOLVED":
self.send_header("Content-Type", "application/json") add_to_digest(alert) # removes from buffer
self.end_headers() continue
if is_critical(alert) or is_immediate_label(alert):
runbook = alert.get("annotations", {}).get("runbook", "")
parts = [summary]
if desc and desc != summary: parts.append(desc)
if runbook: parts.append("STEPS: " + runbook)
pl = {"title": name, "severity": (severity or "warning").capitalize(),
"host": labels.get("instance", labels.get("pod", labels.get("namespace", "unknown"))),
"message": " | ".join(parts), "eventId": alert.get("fingerprint", ""),
"source": "Grafana (immediate)", "status": "PROBLEM", "acknowledged": False}
post_thermal(pl, "immediate")
elif is_batched_label(alert):
add_to_digest(alert)
# else: IRC-only
self.send_response(200); self.send_header("Content-Type", "application/json"); self.end_headers()
self.wfile.write(b'{"status":"ok"}') self.wfile.write(b'{"status":"ok"}')
def do_GET(self): def do_GET(self):
self.send_response(200) self.send_response(200); self.send_header("Content-Type", "application/json"); self.end_headers()
self.send_header("Content-Type", "application/json") with _buffer_lock:
self.end_headers() alertnames = sorted({it["alert"].get("labels", {}).get("alertname", "?") for it in _buffer.values()})
self.wfile.write(json.dumps({"service":"irc-notify","thermal_print":PRINT_ENABLED}).encode()) depth = len(_buffer)
info = {
"service": "irc-notify",
"config": {"thermal_print_enabled": THERMAL_PRINT_ENABLED,
"batch_interval_min": BATCH_INTERVAL_MIN,
"batch_max_pending": BATCH_MAX_PENDING,
"irc_target": f"{IRC_HOST}:{IRC_PORT} {IRC_CHANNEL}",
"print_web_url": PRINT_WEB_URL},
"buffer": {"depth": depth, "alertnames": alertnames,
"seconds_since_last_flush": int(time.time() - _last_flush_time),
"seconds_until_next_flush": max(0, int(BATCH_INTERVAL_MIN*60 - (time.time() - _last_flush_time)))},
"stats": _stats,
}
self.wfile.write(json.dumps(info, indent=2).encode())
def log_message(self, format, *args): def log_message(self, format, *args):
print(f"[irc-notify] {args[0]}", file=sys.stderr) print(f"[irc-notify] {args[0]}", file=sys.stderr)
if __name__ == "__main__": if __name__ == "__main__":
threading.Thread(target=digest_loop, daemon=True).start()
server = HTTPServer(("0.0.0.0", 9119), Handler) server = HTTPServer(("0.0.0.0", 9119), Handler)
print(f"IRC alert relay :9119 -> {IRC_HOST}:{IRC_PORT} {IRC_CHANNEL} (thermal: {PRINT_ENABLED})") print(f"[irc-notify] :9119 -> IRC {IRC_HOST}:{IRC_PORT} {IRC_CHANNEL} | thermal={'ON' if THERMAL_PRINT_ENABLED else 'OFF'} | digest={BATCH_INTERVAL_MIN}m max={BATCH_MAX_PENDING}", file=sys.stderr)
server.serve_forever() server.serve_forever()
# ============================================================================= # =============================================================================