#!/usr/bin/env python3 """Generate and optionally apply FlowerCore Authentik OIDC client assets. Dry-run is the default. Live Authentik mutations require --apply plus an AUTHENTIK_TOKEN bearer token and an operator-provided client secret JSON file. The script never prints client_secret values. """ from __future__ import annotations import argparse import json import os import sys import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass from typing import Any CLAIMS = ( "fc:roles", "fc:tenant", "fc:svc", "fc:scope", "fc:mfa", "flowercore_actor_id", ) BUILTIN_SCOPE_MAPPINGS = ( "goauthentik.io/providers/oauth2/scope-openid", "goauthentik.io/providers/oauth2/scope-profile", "goauthentik.io/providers/oauth2/scope-email", "goauthentik.io/providers/oauth2/scope-offline_access", ) FLOW_CONTRACT = { "response_type": "code", "grant_types": ["authorization_code", "refresh_token"], "offline_access_required": True, } @dataclass(frozen=True) class ServiceSpec: slug: str namespace: str display_name: str host: str @property def client_id(self) -> str: return self.slug @property def provider_name(self) -> str: return f"FlowerCore {self.display_name} OIDC" @property def application_name(self) -> str: return f"FlowerCore {self.display_name}" @property def issuer_url(self) -> str: return f"https://id.iamworkin.lan/application/o/{self.slug}/" @property def onepassword_item_path(self) -> str: return f"IAmWorkin/items/{self.slug}-oidc-client" SERVICE_SPECS = ( ServiceSpec("library", "fc-library", "Library", "library.iamworkin.lan"), ServiceSpec("retail", "fc-retail", "Retail", "retail.iamworkin.lan"), ServiceSpec("telephony", "telephony", "Telephony", "telephony.iamworkin.lan"), ServiceSpec("knowledge", "knowledge", "Knowledge", "knowledge.iamworkin.lan"), ServiceSpec("llmbridge", "fc-llm-bridge", "LlmBridge", "fc-llm-bridge.iamworkin.lan"), ServiceSpec("mysql", "fc-mysql", "MySQL", "mysql.iamworkin.lan"), ServiceSpec("php", "fc-php", "PHP", "php.iamworkin.lan"), ServiceSpec("signage", "fc-signage", "Signage", "signage.iamworkin.lan"), ServiceSpec("media", "fc-media", "Media", "media.iamworkin.lan"), ServiceSpec("dms", "fc-dms", "DMS", "dms.iamworkin.lan"), ServiceSpec("pimanager", "fc-pimanager", "PiManager", "pimanager.iamworkin.lan"), ServiceSpec("distribution", "fc-distribution", "Distribution", "distribution.iamworkin.lan"), ServiceSpec("dns", "fc-dns", "DNS", "dns.iamworkin.lan"), ServiceSpec("print", "fc-print", "Print", "print.iamworkin.lan"), ServiceSpec("aistation", "fc-aistation", "AiStation", "aistation.iamworkin.lan"), ServiceSpec("irc", "irc", "IRC", "irc.iamworkin.lan"), ServiceSpec("ttsreader", "fc-ttsreader", "TtsReader", "ttsreader.iamworkin.lan"), ServiceSpec("chat", "fc-chat", "Chat", "chat.iamworkin.lan"), ServiceSpec("intranet", "intranet", "Intranet", "intranet.iamworkin.lan"), ServiceSpec("remotedesktop", "fc-desktop", "RemoteDesktop", "remotedesktop.iamworkin.lan"), ServiceSpec("provisioning", "fc-provisioning", "Provisioning", "provisioning.iamworkin.lan"), ServiceSpec("scoreboards", "fc-scoreboard", "Scoreboards", "scoreboards.iamworkin.lan"), ServiceSpec("mndot", "fc-mndot", "MnDOT", "mndot.iamworkin.lan"), ServiceSpec("kiosk", "fc-system", "Kiosk", "kiosk.iamworkin.lan"), ServiceSpec("mike-bundle", "fc-mike-bundle", "Mike Bundle", "mike-bundle.iamworkin.lan"), ServiceSpec("messageboard", "fc-messageboard", "MessageBoard", "messageboard.iamworkin.lan"), ServiceSpec("menuboard", "fc-menuboard", "MenuBoard", "menuboard.iamworkin.lan"), ServiceSpec("presentations", "fc-presentations", "Presentations", "presentations.iamworkin.lan"), ServiceSpec("segmentdisplay", "fc-segmentdisplay", "SegmentDisplay", "segmentdisplay.iamworkin.lan"), ServiceSpec("signalcontrol", "fc-signalcontrol", "SignalControl", "signalcontrol.iamworkin.lan"), ServiceSpec("worldbuilder", "fc-worldbuilder", "WorldBuilder", "worldbuilder.iamworkin.lan"), ServiceSpec("audit", "fc-audit", "Audit", "audit.iamworkin.lan"), ServiceSpec("licensing", "fc-licensing", "Licensing", "licensing.iamworkin.lan"), ) def scope_mapping_payloads(service: ServiceSpec) -> list[dict[str, str]]: managed_prefix = f"flowercore.io/authentik/oidc/{service.slug}" return [ { "managed": f"{managed_prefix}/fc-roles", "name": f"FlowerCore {service.slug} fc:roles", "scope_name": "flowercore", "description": "FlowerCore role claim from Authentik group memberships.", "expression": ( "groups = [group.name for group in request.user.ak_groups.all()]\n" "return {'fc:roles': ','.join(groups)}" ), }, { "managed": f"{managed_prefix}/fc-tenant", "name": f"FlowerCore {service.slug} fc:tenant", "scope_name": "flowercore", "description": "FlowerCore tenant claim from group attribute fc_tenant_id.", "expression": ( "for group in request.user.ak_groups.all():\n" " tenant_id = group.attributes.get('fc_tenant_id')\n" " if tenant_id:\n" " return {'fc:tenant': tenant_id}\n" "return {'fc:tenant': 'default'}" ), }, { "managed": f"{managed_prefix}/fc-svc", "name": f"FlowerCore {service.slug} fc:svc", "scope_name": "flowercore", "description": "FlowerCore service slug claim.", "expression": f"return {{'fc:svc': '{service.slug}'}}", }, { "managed": f"{managed_prefix}/fc-scope", "name": f"FlowerCore {service.slug} fc:scope", "scope_name": "flowercore", "description": "FlowerCore service permission scope claim.", "expression": f"return {{'fc:scope': 'flowercore:{service.slug}'}}", }, { "managed": f"{managed_prefix}/fc-mfa", "name": f"FlowerCore {service.slug} fc:mfa", "scope_name": "flowercore", "description": "FlowerCore MFA satisfied session claim.", "expression": ( "mfa_stage = request.session.get('authentik/stages/authenticator_validate')\n" "return {'fc:mfa': bool(mfa_stage)}" ), }, { "managed": f"{managed_prefix}/flowercore-actor-id", "name": f"FlowerCore {service.slug} flowercore_actor_id", "scope_name": "flowercore", "description": "FlowerCore audit actor alias for the Authentik user id.", "expression": "return {'flowercore_actor_id': str(request.user.uid)}", }, ] def provider_payload( service: ServiceSpec, args: argparse.Namespace, mapping_ids: list[str], client_secret: str, ) -> dict[str, Any]: payload: dict[str, Any] = { "name": service.provider_name, "authorization_flow": args.authorization_flow, "invalidation_flow": args.invalidation_flow, "property_mappings": mapping_ids, "client_type": "confidential", "client_id": service.client_id, "client_secret": client_secret, "access_code_validity": "minutes=1", "access_token_validity": "hours=1", "refresh_token_validity": "days=30", "include_claims_in_id_token": True, "redirect_uris": [ {"matching_mode": "strict", "url": f"https://{service.host}/signin-oidc"}, {"matching_mode": "strict", "url": f"https://{service.host}/signout-callback-oidc"}, ], "sub_mode": "hashed_user_id", "issuer_mode": "per_provider", } if args.authentication_flow: payload["authentication_flow"] = args.authentication_flow if args.signing_key: payload["signing_key"] = args.signing_key return payload def application_payload(service: ServiceSpec, provider_pk: int | str | None) -> dict[str, Any]: return { "name": service.application_name, "slug": service.slug, "provider": provider_pk, "open_in_new_tab": True, "meta_launch_url": f"https://{service.host}/", "meta_description": f"FlowerCore {service.display_name} OIDC client", "meta_publisher": "FlowerCore", "policy_engine_mode": "all", "group": "FlowerCore", } def load_client_secrets(path: str | None) -> dict[str, str]: if not path: return {} with open(path, "r", encoding="utf-8") as handle: data = json.load(handle) if not isinstance(data, dict): raise ValueError("client secret file must be a JSON object keyed by service slug") return {str(key): str(value) for key, value in data.items()} def redact(value: Any) -> Any: if isinstance(value, dict): return { key: "" if key == "client_secret" else redact(child) for key, child in value.items() } if isinstance(value, list): return [redact(child) for child in value] return value class AuthentikClient: def __init__(self, base_url: str, token: str) -> None: self.base_url = base_url.rstrip("/") self.token = token def request(self, method: str, path: str, payload: dict[str, Any] | None = None) -> Any: url = f"{self.base_url}/api/v3/{path.lstrip('/')}" body = None headers = { "Accept": "application/json", "Authorization": f"Bearer {self.token}", } if payload is not None: headers["Content-Type"] = "application/json" body = json.dumps(payload).encode("utf-8") request = urllib.request.Request(url, data=body, headers=headers, method=method) try: with urllib.request.urlopen(request, timeout=30) as response: text = response.read().decode("utf-8") except urllib.error.HTTPError as error: error_text = error.read().decode("utf-8", errors="replace") raise RuntimeError(f"{method} {path} failed with HTTP {error.code}: {error_text}") from error if not text: return None return json.loads(text) def first_result(self, path: str, **query: str) -> dict[str, Any] | None: query_string = urllib.parse.urlencode(query) response = self.request("GET", f"{path}?{query_string}") results = response.get("results", []) if isinstance(response, dict) else [] return results[0] if results else None def select_services(slugs: list[str]) -> list[ServiceSpec]: if not slugs: return list(SERVICE_SPECS) by_slug = {service.slug: service for service in SERVICE_SPECS} unknown = sorted(set(slugs) - set(by_slug)) if unknown: raise ValueError(f"unknown service slug(s): {', '.join(unknown)}") return [by_slug[slug] for slug in slugs] def validate_specs(services: list[ServiceSpec]) -> None: slugs = [service.slug for service in services] if len(slugs) != len(set(slugs)): raise ValueError("duplicate service slug in OIDC roster") for service in services: if not service.namespace: raise ValueError(f"{service.slug} is missing a target namespace") expressions = "\n".join(mapping["expression"] for mapping in scope_mapping_payloads(service)) missing_claims = [claim for claim in CLAIMS if claim not in expressions] if missing_claims: raise ValueError(f"{service.slug} mapping payloads miss claims: {', '.join(missing_claims)}") def dry_run(services: list[ServiceSpec], args: argparse.Namespace) -> int: placeholder_ids = [ *[f"" for managed in BUILTIN_SCOPE_MAPPINGS], *[f"<{claim}-mapping-pk>" for claim in CLAIMS], ] documents = [] for service in services: provider = provider_payload(service, args, placeholder_ids, "") documents.append( { "service": service.slug, "namespace": service.namespace, "onepassword_item": service.onepassword_item_path, "issuer_url": service.issuer_url, "flow_contract": FLOW_CONTRACT, "builtin_scope_mappings": list(BUILTIN_SCOPE_MAPPINGS), "scope_mappings": scope_mapping_payloads(service), "provider": redact(provider), "application": application_payload(service, ""), } ) if args.print_json: print(json.dumps(documents, indent=2, sort_keys=True)) else: print( "Dry-run only: generated " f"{len(services)} providers, {len(services)} applications, " f"and {len(services) * len(CLAIMS)} scope mappings." ) print("Use --print-json to inspect redacted payloads; use --apply for live Authentik mutation.") return 0 def apply(services: list[ServiceSpec], args: argparse.Namespace) -> int: token = os.environ.get("AUTHENTIK_TOKEN") if not token: raise ValueError("AUTHENTIK_TOKEN is required with --apply") if not args.client_secrets_json: raise ValueError("--client-secrets-json is required with --apply") secrets = load_client_secrets(args.client_secrets_json) missing = [service.slug for service in services if not secrets.get(service.slug)] if missing: raise ValueError(f"client secret JSON is missing slug(s): {', '.join(missing)}") client = AuthentikClient(args.base_url, token) for service in services: mapping_ids: list[str] = [] for managed in BUILTIN_SCOPE_MAPPINGS: existing = client.first_result("/propertymappings/provider/scope/", managed=managed) if not existing: raise ValueError(f"built-in Authentik scope mapping not found: {managed}") mapping_ids.append(existing["pk"]) for mapping in scope_mapping_payloads(service): existing = client.first_result("/propertymappings/provider/scope/", name=mapping["name"]) if existing and not args.update_existing: mapping_ids.append(existing["pk"]) continue if existing and args.update_existing: updated = client.request("PATCH", f"/propertymappings/provider/scope/{existing['pk']}/", mapping) mapping_ids.append(updated["pk"]) continue created = client.request("POST", "/propertymappings/provider/scope/", mapping) mapping_ids.append(created["pk"]) existing_provider = client.first_result("/providers/oauth2/", client_id=service.client_id) provider_body = provider_payload(service, args, mapping_ids, secrets[service.slug]) if existing_provider and not args.update_existing: provider_pk = existing_provider["pk"] elif existing_provider: updated_provider = client.request("PATCH", f"/providers/oauth2/{existing_provider['pk']}/", provider_body) provider_pk = updated_provider["pk"] else: provider_pk = client.request("POST", "/providers/oauth2/", provider_body)["pk"] app_body = application_payload(service, provider_pk) existing_app = client.first_result("/core/applications/", slug=service.slug) if existing_app and args.update_existing: client.request("PATCH", f"/core/applications/{existing_app['slug']}/", app_body) elif not existing_app: client.request("POST", "/core/applications/", app_body) print(f"applied {service.slug}: provider/app present, secret redacted") return 0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--apply", action="store_true", help="perform live Authentik REST mutations") parser.add_argument("--update-existing", action="store_true", help="patch existing mappings/providers/applications") parser.add_argument("--print-json", action="store_true", help="print redacted dry-run payloads") parser.add_argument("--service", action="append", default=[], help="limit to one service slug; repeatable") parser.add_argument("--base-url", default="http://localhost:9000", help="Authentik base URL") parser.add_argument("--client-secrets-json", help="operator-provided JSON object of slug to client_secret") parser.add_argument("--authorization-flow", default="") parser.add_argument("--invalidation-flow", default="") parser.add_argument("--authentication-flow") parser.add_argument("--signing-key", help="shared Authentik signing key UUID") return parser.parse_args() def main() -> int: args = parse_args() try: services = select_services(args.service) validate_specs(services) if args.apply: return apply(services, args) return dry_run(services, args) except Exception as error: print(f"error: {error}", file=sys.stderr) return 2 if __name__ == "__main__": raise SystemExit(main())