monitoring(irc-notify): hourly digest batching for thermal printer
The thermal printer drained overnight (2026-05-18/19) because the old
notify.py POSTed one print job per Grafana webhook fire. With 9
concurrently-firing alerts (zabbix-postgres + fc-devicemgmt + brochure
+ PrintPaperRollLow), every evaluation cycle stamped fresh CUPS jobs
onto the queue until the operator physically powered the printer off.
This refactor:
- Adds env-var config: THERMAL_PRINT_ENABLED (master kill switch),
BATCH_INTERVAL_MIN (default 60), BATCH_MAX_PENDING (default 50).
- IRC delivery stays per-event (operator wants the live stream).
- Thermal routing now:
* critical/disaster/page severity OR alert_channel=thermal_print_immediate
-> print immediately
* alert_channel=thermal_print -> enqueue into hourly digest
* RESOLVED -> remove from digest buffer (no resolution-spam prints)
* else -> IRC only, no thermal
- Background digest_loop thread flushes the buffer hourly (or sooner
if buffer hits BATCH_MAX_PENDING). Digest payload is a single
Print.Web /api/print/alert POST listing distinct alertnames + per-rule
target counts.
- New POST /flush endpoint (manual operator force-flush; useful for
testing without waiting an hour).
- GET / returns config + buffer depth + per-stat counters for observability.
Net effect: max 1 thermal print per BATCH_INTERVAL_MIN for batched
warnings, plus immediate prints for criticals. Closes the 2026-05-18/19
alert-storm incident.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1273,24 +1273,55 @@ metadata:
|
||||
data:
|
||||
notify.py: |
|
||||
#!/usr/bin/env python3
|
||||
"""HTTP->IRC alert relay with thermal printer forwarding for Grafana webhooks.
|
||||
Listens on :9119, posts to #alerts on UnrealIRCd via raw IRC protocol.
|
||||
Alerts tagged alert_channel=thermal_print also POST to Print.Web /api/print/alert.
|
||||
"""HTTP->IRC alert relay with thermal-printer DIGEST forwarding.
|
||||
|
||||
Listens on :9119, posts to #alerts on UnrealIRCd, forwards to Print.Web
|
||||
/api/print/alert. Thermal printing is BATCHED into hourly digests by
|
||||
default so the printer no longer spam-fires per Grafana webhook.
|
||||
|
||||
Routing (per Grafana webhook alert):
|
||||
- IRC: always per-event (operator likes the stream)
|
||||
- Thermal printer:
|
||||
* severity in {critical,disaster,page} OR
|
||||
label alert_channel=thermal_print_immediate -> print NOW
|
||||
* label alert_channel=thermal_print -> enqueue into hourly digest
|
||||
* everything else -> IRC only
|
||||
- RESOLVED webhooks remove the alert from the digest buffer
|
||||
|
||||
Env vars (defaults preserve old behavior on first deploy):
|
||||
THERMAL_PRINT_ENABLED default "true" - master kill switch
|
||||
BATCH_INTERVAL_MIN default "60" - minutes between digest prints
|
||||
BATCH_MAX_PENDING default "50" - force-flush threshold
|
||||
|
||||
HTTP surface:
|
||||
POST / - Grafana webhook entry
|
||||
POST /flush - manual digest flush (idempotent)
|
||||
GET / - status + config + buffer depth + stats
|
||||
"""
|
||||
import json, socket, sys, time
|
||||
import json, os, socket, sys, threading, time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import URLError
|
||||
|
||||
IRC_HOST = "unrealircd.irc.svc" # short name: CoreDNS ndots:5 + iamworkin.lan template hijacks full .cluster.local (see memory)
|
||||
IRC_PORT = 6667
|
||||
IRC_NICK = "grafana-bot"
|
||||
IRC_CHANNEL = "#alerts"
|
||||
PRINT_WEB_URL = "http://10.0.57.16:5200/api/print/alert"
|
||||
PRINT_ENABLED = True
|
||||
THERMAL_PRINT_ENABLED = os.environ.get("THERMAL_PRINT_ENABLED", "true").lower() == "true"
|
||||
BATCH_INTERVAL_MIN = int(os.environ.get("BATCH_INTERVAL_MIN", "60"))
|
||||
BATCH_MAX_PENDING = int(os.environ.get("BATCH_MAX_PENDING", "50"))
|
||||
|
||||
IRC_HOST = os.environ.get("IRC_HOST", "unrealircd.irc.svc")
|
||||
IRC_PORT = int(os.environ.get("IRC_PORT", "6667"))
|
||||
IRC_NICK = os.environ.get("IRC_NICK", "grafana-bot")
|
||||
IRC_CHANNEL = os.environ.get("IRC_CHANNEL", "#alerts")
|
||||
PRINT_WEB_URL = os.environ.get("PRINT_WEB_URL", "http://10.0.57.16:5200/api/print/alert")
|
||||
|
||||
_buffer_lock = threading.Lock()
|
||||
_buffer = {} # fingerprint -> {"alert": dict, "first_seen": float, "last_seen": float}
|
||||
_last_flush_time = time.time()
|
||||
_stats = {"webhooks_received": 0, "irc_sent": 0, "print_immediate": 0,
|
||||
"digest_flushed": 0, "buffer_dedup": 0, "buffer_added": 0,
|
||||
"buffer_resolved": 0, "started_at": time.time()}
|
||||
|
||||
def send_irc(message):
|
||||
"""Connect, handle PING, join, send, quit."""
|
||||
try:
|
||||
sock = socket.create_connection((IRC_HOST, IRC_PORT), timeout=15)
|
||||
sock.sendall(f"NICK {IRC_NICK}\r\n".encode())
|
||||
@@ -1323,52 +1354,133 @@ data:
|
||||
time.sleep(0.5)
|
||||
sock.sendall(b"QUIT :alert delivered\r\n")
|
||||
sock.close()
|
||||
_stats["irc_sent"] += 1
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[irc-notify] IRC send failed: {e}", file=sys.stderr)
|
||||
return False
|
||||
|
||||
def send_thermal_print(alert):
|
||||
if not PRINT_ENABLED: return
|
||||
labels = alert.get("labels", {})
|
||||
annotations = alert.get("annotations", {})
|
||||
status = alert.get("status", "firing").upper()
|
||||
summary = annotations.get("summary", "")
|
||||
description = annotations.get("description", "")
|
||||
runbook = annotations.get("runbook", "")
|
||||
# Build a useful message: summary + description + runbook steps
|
||||
parts = []
|
||||
if summary: parts.append(summary)
|
||||
if description and description != summary: parts.append(description)
|
||||
if runbook: parts.append("STEPS: " + runbook)
|
||||
message = " | ".join(parts) if parts else labels.get("alertname", "Unknown alert")
|
||||
payload = {
|
||||
"title": labels.get("alertname", "Unknown"),
|
||||
"severity": labels.get("severity", "warning").capitalize(),
|
||||
"host": labels.get("instance", labels.get("host", "unknown")),
|
||||
"message": message,
|
||||
"eventId": alert.get("fingerprint", ""),
|
||||
"source": "Grafana",
|
||||
"status": "RESOLVED" if status == "RESOLVED" else "PROBLEM",
|
||||
"acknowledged": False
|
||||
}
|
||||
def post_thermal(payload, kind):
|
||||
if not THERMAL_PRINT_ENABLED:
|
||||
print(f"[irc-notify] thermal disabled; skip {kind} ({payload.get('title','?')[:40]})", file=sys.stderr)
|
||||
return False
|
||||
try:
|
||||
req = Request(PRINT_WEB_URL, data=json.dumps(payload).encode("utf-8"),
|
||||
headers={"Content-Type": "application/json"}, method="POST")
|
||||
resp = urlopen(req, timeout=10)
|
||||
print(f"[irc-notify] Thermal print sent: {resp.read().decode()}", file=sys.stderr)
|
||||
if kind == "immediate": _stats["print_immediate"] += 1
|
||||
print(f"[irc-notify] thermal {kind} sent: {payload.get('title','?')[:50]}", file=sys.stderr)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[irc-notify] Thermal print failed: {e}", file=sys.stderr)
|
||||
|
||||
def should_print(alert):
|
||||
labels = alert.get("labels", {})
|
||||
if labels.get("alert_channel") == "thermal_print": return True
|
||||
if labels.get("severity", "").lower() in ("critical", "disaster"): return True
|
||||
if alert.get("status", "").upper() == "RESOLVED": return False
|
||||
print(f"[irc-notify] thermal {kind} failed: {e}", file=sys.stderr)
|
||||
return False
|
||||
|
||||
def fingerprint_of(alert):
|
||||
fp = alert.get("fingerprint", "")
|
||||
if fp: return fp
|
||||
labels = alert.get("labels", {})
|
||||
target = labels.get("pod") or labels.get("instance") or labels.get("deployment") or labels.get("statefulset") or labels.get("namespace") or ""
|
||||
return f"{labels.get('alertname','?')}/{labels.get('namespace','')}/{target}"
|
||||
|
||||
def is_critical(alert):
|
||||
return alert.get("labels", {}).get("severity", "").lower() in ("critical", "disaster", "page")
|
||||
|
||||
def is_immediate_label(alert):
|
||||
return alert.get("labels", {}).get("alert_channel") == "thermal_print_immediate"
|
||||
|
||||
def is_batched_label(alert):
|
||||
return alert.get("labels", {}).get("alert_channel") == "thermal_print"
|
||||
|
||||
def add_to_digest(alert):
|
||||
if not THERMAL_PRINT_ENABLED: return
|
||||
fp = fingerprint_of(alert)
|
||||
status = alert.get("status", "firing").lower()
|
||||
with _buffer_lock:
|
||||
if status == "resolved":
|
||||
if fp in _buffer:
|
||||
del _buffer[fp]
|
||||
_stats["buffer_resolved"] += 1
|
||||
return
|
||||
if fp in _buffer:
|
||||
_buffer[fp]["last_seen"] = time.time()
|
||||
_buffer[fp]["alert"] = alert
|
||||
_stats["buffer_dedup"] += 1
|
||||
return
|
||||
_buffer[fp] = {"alert": alert, "first_seen": time.time(), "last_seen": time.time()}
|
||||
_stats["buffer_added"] += 1
|
||||
|
||||
def build_digest_payload():
|
||||
with _buffer_lock:
|
||||
items = list(_buffer.values())
|
||||
if not items: return None
|
||||
by_name = defaultdict(list)
|
||||
for item in items:
|
||||
labels = item["alert"].get("labels", {})
|
||||
by_name[labels.get("alertname", "Unknown")].append(item)
|
||||
lines = []
|
||||
for name, group in sorted(by_name.items()):
|
||||
targets = []
|
||||
for it in group[:5]:
|
||||
labels = it["alert"].get("labels", {})
|
||||
t = (labels.get("pod") or labels.get("instance") or labels.get("deployment")
|
||||
or labels.get("statefulset") or labels.get("namespace") or "?")
|
||||
targets.append(t)
|
||||
more = f" (+{len(group)-5})" if len(group) > 5 else ""
|
||||
sevs = sorted({it["alert"].get("labels", {}).get("severity", "warning") for it in group})
|
||||
lines.append(f"[{'/'.join(sevs)}] {name} x{len(group)}: {', '.join(targets)}{more}")
|
||||
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
||||
title = f"Alert digest: {len(items)} firing"
|
||||
body = "\n".join([
|
||||
f"=== {title} ===",
|
||||
f"as of {now}",
|
||||
"",
|
||||
*lines,
|
||||
"",
|
||||
"Stream: #alerts (IRC) | Triage: grafana-noc1.iamworkin.lan",
|
||||
"Force-flush: POST irc-notify.monitoring.svc:9119/flush",
|
||||
])
|
||||
return {"title": title, "severity": "Warning", "host": "monitoring",
|
||||
"message": body, "eventId": f"digest-{int(time.time())}",
|
||||
"source": "Grafana digest", "status": "PROBLEM", "acknowledged": False}
|
||||
|
||||
def flush_digest():
|
||||
payload = build_digest_payload()
|
||||
if payload is None:
|
||||
print("[irc-notify] flush: buffer empty, no digest sent", file=sys.stderr)
|
||||
return False
|
||||
sent = post_thermal(payload, "digest")
|
||||
with _buffer_lock:
|
||||
_buffer.clear()
|
||||
if sent: _stats["digest_flushed"] += 1
|
||||
return sent
|
||||
|
||||
def digest_loop():
|
||||
global _last_flush_time
|
||||
while True:
|
||||
try:
|
||||
now = time.time()
|
||||
elapsed = now - _last_flush_time
|
||||
if elapsed >= BATCH_INTERVAL_MIN * 60:
|
||||
print(f"[irc-notify] digest tick: interval reached ({BATCH_INTERVAL_MIN}m); buffer={len(_buffer)}", file=sys.stderr)
|
||||
flush_digest()
|
||||
_last_flush_time = now
|
||||
elif len(_buffer) >= BATCH_MAX_PENDING:
|
||||
print(f"[irc-notify] digest tick: buffer full ({len(_buffer)}); force flush", file=sys.stderr)
|
||||
flush_digest()
|
||||
_last_flush_time = now
|
||||
time.sleep(15)
|
||||
except Exception as e:
|
||||
print(f"[irc-notify] digest loop error: {e}", file=sys.stderr)
|
||||
time.sleep(60)
|
||||
|
||||
class Handler(BaseHTTPRequestHandler):
|
||||
def do_POST(self):
|
||||
if self.path == "/flush":
|
||||
ok = flush_digest()
|
||||
self.send_response(200); self.send_header("Content-Type", "application/json"); self.end_headers()
|
||||
self.wfile.write(json.dumps({"flushed": ok, "buffer_after": len(_buffer)}).encode())
|
||||
return
|
||||
_stats["webhooks_received"] += 1
|
||||
length = int(self.headers.get("Content-Length", 0))
|
||||
body = json.loads(self.rfile.read(length)) if length else {}
|
||||
for alert in body.get("alerts", []):
|
||||
@@ -1383,22 +1495,52 @@ data:
|
||||
msg = f"{icon}{sev_tag} {name}: {summary}"
|
||||
if desc: msg += f"\n {desc}"
|
||||
send_irc(msg)
|
||||
if should_print(alert): send_thermal_print(alert)
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.end_headers()
|
||||
# Thermal routing
|
||||
if status == "RESOLVED":
|
||||
add_to_digest(alert) # removes from buffer
|
||||
continue
|
||||
if is_critical(alert) or is_immediate_label(alert):
|
||||
runbook = alert.get("annotations", {}).get("runbook", "")
|
||||
parts = [summary]
|
||||
if desc and desc != summary: parts.append(desc)
|
||||
if runbook: parts.append("STEPS: " + runbook)
|
||||
pl = {"title": name, "severity": (severity or "warning").capitalize(),
|
||||
"host": labels.get("instance", labels.get("pod", labels.get("namespace", "unknown"))),
|
||||
"message": " | ".join(parts), "eventId": alert.get("fingerprint", ""),
|
||||
"source": "Grafana (immediate)", "status": "PROBLEM", "acknowledged": False}
|
||||
post_thermal(pl, "immediate")
|
||||
elif is_batched_label(alert):
|
||||
add_to_digest(alert)
|
||||
# else: IRC-only
|
||||
self.send_response(200); self.send_header("Content-Type", "application/json"); self.end_headers()
|
||||
self.wfile.write(b'{"status":"ok"}')
|
||||
|
||||
def do_GET(self):
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.end_headers()
|
||||
self.wfile.write(json.dumps({"service":"irc-notify","thermal_print":PRINT_ENABLED}).encode())
|
||||
self.send_response(200); self.send_header("Content-Type", "application/json"); self.end_headers()
|
||||
with _buffer_lock:
|
||||
alertnames = sorted({it["alert"].get("labels", {}).get("alertname", "?") for it in _buffer.values()})
|
||||
depth = len(_buffer)
|
||||
info = {
|
||||
"service": "irc-notify",
|
||||
"config": {"thermal_print_enabled": THERMAL_PRINT_ENABLED,
|
||||
"batch_interval_min": BATCH_INTERVAL_MIN,
|
||||
"batch_max_pending": BATCH_MAX_PENDING,
|
||||
"irc_target": f"{IRC_HOST}:{IRC_PORT} {IRC_CHANNEL}",
|
||||
"print_web_url": PRINT_WEB_URL},
|
||||
"buffer": {"depth": depth, "alertnames": alertnames,
|
||||
"seconds_since_last_flush": int(time.time() - _last_flush_time),
|
||||
"seconds_until_next_flush": max(0, int(BATCH_INTERVAL_MIN*60 - (time.time() - _last_flush_time)))},
|
||||
"stats": _stats,
|
||||
}
|
||||
self.wfile.write(json.dumps(info, indent=2).encode())
|
||||
|
||||
def log_message(self, format, *args):
|
||||
print(f"[irc-notify] {args[0]}", file=sys.stderr)
|
||||
|
||||
if __name__ == "__main__":
|
||||
threading.Thread(target=digest_loop, daemon=True).start()
|
||||
server = HTTPServer(("0.0.0.0", 9119), Handler)
|
||||
print(f"IRC alert relay :9119 -> {IRC_HOST}:{IRC_PORT} {IRC_CHANNEL} (thermal: {PRINT_ENABLED})")
|
||||
print(f"[irc-notify] :9119 -> IRC {IRC_HOST}:{IRC_PORT} {IRC_CHANNEL} | thermal={'ON' if THERMAL_PRINT_ENABLED else 'OFF'} | digest={BATCH_INTERVAL_MIN}m max={BATCH_MAX_PENDING}", file=sys.stderr)
|
||||
server.serve_forever()
|
||||
|
||||
# =============================================================================
|
||||
|
||||
Reference in New Issue
Block a user