Files
bluejay-infra/scripts/authentik-bulk-client-create.py

417 lines
17 KiB
Python

#!/usr/bin/env python3
"""Generate and optionally apply FlowerCore Authentik OIDC client assets.
Dry-run is the default. Live Authentik mutations require --apply plus an
AUTHENTIK_TOKEN bearer token and an operator-provided client secret JSON file.
The script never prints client_secret values.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Any
CLAIMS = (
"fc:roles",
"fc:tenant",
"fc:svc",
"fc:scope",
"fc:mfa",
"flowercore_actor_id",
)
BUILTIN_SCOPE_MAPPINGS = (
"goauthentik.io/providers/oauth2/scope-openid",
"goauthentik.io/providers/oauth2/scope-profile",
"goauthentik.io/providers/oauth2/scope-email",
"goauthentik.io/providers/oauth2/scope-offline_access",
)
FLOW_CONTRACT = {
"response_type": "code",
"grant_types": ["authorization_code", "refresh_token"],
"offline_access_required": True,
}
@dataclass(frozen=True)
class ServiceSpec:
slug: str
namespace: str
display_name: str
host: str
@property
def client_id(self) -> str:
return self.slug
@property
def provider_name(self) -> str:
return f"FlowerCore {self.display_name} OIDC"
@property
def application_name(self) -> str:
return f"FlowerCore {self.display_name}"
@property
def issuer_url(self) -> str:
return f"https://id.iamworkin.lan/application/o/{self.slug}/"
@property
def onepassword_item_path(self) -> str:
return f"IAmWorkin/items/{self.slug}-oidc-client"
SERVICE_SPECS = (
ServiceSpec("library", "fc-library", "Library", "library.iamworkin.lan"),
ServiceSpec("retail", "fc-retail", "Retail", "retail.iamworkin.lan"),
ServiceSpec("telephony", "telephony", "Telephony", "telephony.iamworkin.lan"),
ServiceSpec("knowledge", "knowledge", "Knowledge", "knowledge.iamworkin.lan"),
ServiceSpec("llmbridge", "fc-llm-bridge", "LlmBridge", "fc-llm-bridge.iamworkin.lan"),
ServiceSpec("mysql", "fc-mysql", "MySQL", "mysql.iamworkin.lan"),
ServiceSpec("php", "fc-php", "PHP", "php.iamworkin.lan"),
ServiceSpec("signage", "fc-signage", "Signage", "signage.iamworkin.lan"),
ServiceSpec("media", "fc-media", "Media", "media.iamworkin.lan"),
ServiceSpec("dms", "fc-dms", "DMS", "dms.iamworkin.lan"),
ServiceSpec("pimanager", "fc-pimanager", "PiManager", "pimanager.iamworkin.lan"),
ServiceSpec("distribution", "fc-distribution", "Distribution", "distribution.iamworkin.lan"),
ServiceSpec("dns", "fc-dns", "DNS", "dns.iamworkin.lan"),
ServiceSpec("print", "fc-print", "Print", "print.iamworkin.lan"),
ServiceSpec("aistation", "fc-aistation", "AiStation", "aistation.iamworkin.lan"),
ServiceSpec("irc", "irc", "IRC", "irc.iamworkin.lan"),
ServiceSpec("ttsreader", "fc-ttsreader", "TtsReader", "ttsreader.iamworkin.lan"),
ServiceSpec("chat", "fc-chat", "Chat", "chat.iamworkin.lan"),
ServiceSpec("intranet", "intranet", "Intranet", "intranet.iamworkin.lan"),
ServiceSpec("remotedesktop", "fc-desktop", "RemoteDesktop", "remotedesktop.iamworkin.lan"),
ServiceSpec("provisioning", "fc-provisioning", "Provisioning", "provisioning.iamworkin.lan"),
ServiceSpec("scoreboards", "fc-scoreboard", "Scoreboards", "scoreboards.iamworkin.lan"),
ServiceSpec("mndot", "fc-mndot", "MnDOT", "mndot.iamworkin.lan"),
ServiceSpec("kiosk", "fc-system", "Kiosk", "kiosk.iamworkin.lan"),
ServiceSpec("mike-bundle", "fc-mike-bundle", "Mike Bundle", "mike-bundle.iamworkin.lan"),
ServiceSpec("messageboard", "fc-messageboard", "MessageBoard", "messageboard.iamworkin.lan"),
ServiceSpec("menuboard", "fc-menuboard", "MenuBoard", "menuboard.iamworkin.lan"),
ServiceSpec("presentations", "fc-presentations", "Presentations", "presentations.iamworkin.lan"),
ServiceSpec("segmentdisplay", "fc-segmentdisplay", "SegmentDisplay", "segmentdisplay.iamworkin.lan"),
ServiceSpec("signalcontrol", "fc-signalcontrol", "SignalControl", "signalcontrol.iamworkin.lan"),
ServiceSpec("worldbuilder", "fc-worldbuilder", "WorldBuilder", "worldbuilder.iamworkin.lan"),
ServiceSpec("audit", "fc-audit", "Audit", "audit.iamworkin.lan"),
ServiceSpec("licensing", "fc-licensing", "Licensing", "licensing.iamworkin.lan"),
)
def scope_mapping_payloads(service: ServiceSpec) -> list[dict[str, str]]:
managed_prefix = f"flowercore.io/authentik/oidc/{service.slug}"
return [
{
"managed": f"{managed_prefix}/fc-roles",
"name": f"FlowerCore {service.slug} fc:roles",
"scope_name": "flowercore",
"description": "FlowerCore role claim from Authentik group memberships.",
"expression": (
"groups = [group.name for group in request.user.ak_groups.all()]\n"
"return {'fc:roles': ','.join(groups)}"
),
},
{
"managed": f"{managed_prefix}/fc-tenant",
"name": f"FlowerCore {service.slug} fc:tenant",
"scope_name": "flowercore",
"description": "FlowerCore tenant claim from group attribute fc_tenant_id.",
"expression": (
"for group in request.user.ak_groups.all():\n"
" tenant_id = group.attributes.get('fc_tenant_id')\n"
" if tenant_id:\n"
" return {'fc:tenant': tenant_id}\n"
"return {'fc:tenant': 'default'}"
),
},
{
"managed": f"{managed_prefix}/fc-svc",
"name": f"FlowerCore {service.slug} fc:svc",
"scope_name": "flowercore",
"description": "FlowerCore service slug claim.",
"expression": f"return {{'fc:svc': '{service.slug}'}}",
},
{
"managed": f"{managed_prefix}/fc-scope",
"name": f"FlowerCore {service.slug} fc:scope",
"scope_name": "flowercore",
"description": "FlowerCore service permission scope claim.",
"expression": f"return {{'fc:scope': 'flowercore:{service.slug}'}}",
},
{
"managed": f"{managed_prefix}/fc-mfa",
"name": f"FlowerCore {service.slug} fc:mfa",
"scope_name": "flowercore",
"description": "FlowerCore MFA satisfied session claim.",
"expression": (
"mfa_stage = request.session.get('authentik/stages/authenticator_validate')\n"
"return {'fc:mfa': bool(mfa_stage)}"
),
},
{
"managed": f"{managed_prefix}/flowercore-actor-id",
"name": f"FlowerCore {service.slug} flowercore_actor_id",
"scope_name": "flowercore",
"description": "FlowerCore audit actor alias for the Authentik user id.",
"expression": "return {'flowercore_actor_id': str(request.user.uid)}",
},
]
def provider_payload(
service: ServiceSpec,
args: argparse.Namespace,
mapping_ids: list[str],
client_secret: str,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"name": service.provider_name,
"authorization_flow": args.authorization_flow,
"invalidation_flow": args.invalidation_flow,
"property_mappings": mapping_ids,
"client_type": "confidential",
"client_id": service.client_id,
"client_secret": client_secret,
"access_code_validity": "minutes=1",
"access_token_validity": "hours=1",
"refresh_token_validity": "days=30",
"include_claims_in_id_token": True,
"redirect_uris": [
{"matching_mode": "strict", "url": f"https://{service.host}/signin-oidc"},
{"matching_mode": "strict", "url": f"https://{service.host}/signout-callback-oidc"},
],
"sub_mode": "hashed_user_id",
"issuer_mode": "per_provider",
}
if args.authentication_flow:
payload["authentication_flow"] = args.authentication_flow
if args.signing_key:
payload["signing_key"] = args.signing_key
return payload
def application_payload(service: ServiceSpec, provider_pk: int | str | None) -> dict[str, Any]:
return {
"name": service.application_name,
"slug": service.slug,
"provider": provider_pk,
"open_in_new_tab": True,
"meta_launch_url": f"https://{service.host}/",
"meta_description": f"FlowerCore {service.display_name} OIDC client",
"meta_publisher": "FlowerCore",
"policy_engine_mode": "all",
"group": "FlowerCore",
}
def load_client_secrets(path: str | None) -> dict[str, str]:
if not path:
return {}
with open(path, "r", encoding="utf-8") as handle:
data = json.load(handle)
if not isinstance(data, dict):
raise ValueError("client secret file must be a JSON object keyed by service slug")
return {str(key): str(value) for key, value in data.items()}
def redact(value: Any) -> Any:
if isinstance(value, dict):
return {
key: "<redacted>" if key == "client_secret" else redact(child)
for key, child in value.items()
}
if isinstance(value, list):
return [redact(child) for child in value]
return value
class AuthentikClient:
def __init__(self, base_url: str, token: str) -> None:
self.base_url = base_url.rstrip("/")
self.token = token
def request(self, method: str, path: str, payload: dict[str, Any] | None = None) -> Any:
url = f"{self.base_url}/api/v3/{path.lstrip('/')}"
body = None
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {self.token}",
}
if payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload).encode("utf-8")
request = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(request, timeout=30) as response:
text = response.read().decode("utf-8")
except urllib.error.HTTPError as error:
error_text = error.read().decode("utf-8", errors="replace")
raise RuntimeError(f"{method} {path} failed with HTTP {error.code}: {error_text}") from error
if not text:
return None
return json.loads(text)
def first_result(self, path: str, **query: str) -> dict[str, Any] | None:
query_string = urllib.parse.urlencode(query)
response = self.request("GET", f"{path}?{query_string}")
results = response.get("results", []) if isinstance(response, dict) else []
return results[0] if results else None
def select_services(slugs: list[str]) -> list[ServiceSpec]:
if not slugs:
return list(SERVICE_SPECS)
by_slug = {service.slug: service for service in SERVICE_SPECS}
unknown = sorted(set(slugs) - set(by_slug))
if unknown:
raise ValueError(f"unknown service slug(s): {', '.join(unknown)}")
return [by_slug[slug] for slug in slugs]
def validate_specs(services: list[ServiceSpec]) -> None:
slugs = [service.slug for service in services]
if len(slugs) != len(set(slugs)):
raise ValueError("duplicate service slug in OIDC roster")
for service in services:
if not service.namespace:
raise ValueError(f"{service.slug} is missing a target namespace")
expressions = "\n".join(mapping["expression"] for mapping in scope_mapping_payloads(service))
missing_claims = [claim for claim in CLAIMS if claim not in expressions]
if missing_claims:
raise ValueError(f"{service.slug} mapping payloads miss claims: {', '.join(missing_claims)}")
def dry_run(services: list[ServiceSpec], args: argparse.Namespace) -> int:
placeholder_ids = [
*[f"<builtin-{managed.rsplit('/', 1)[-1]}-pk>" for managed in BUILTIN_SCOPE_MAPPINGS],
*[f"<{claim}-mapping-pk>" for claim in CLAIMS],
]
documents = []
for service in services:
provider = provider_payload(service, args, placeholder_ids, "<from-1password-client_secret>")
documents.append(
{
"service": service.slug,
"namespace": service.namespace,
"onepassword_item": service.onepassword_item_path,
"issuer_url": service.issuer_url,
"flow_contract": FLOW_CONTRACT,
"builtin_scope_mappings": list(BUILTIN_SCOPE_MAPPINGS),
"scope_mappings": scope_mapping_payloads(service),
"provider": redact(provider),
"application": application_payload(service, "<provider-pk>"),
}
)
if args.print_json:
print(json.dumps(documents, indent=2, sort_keys=True))
else:
print(
"Dry-run only: generated "
f"{len(services)} providers, {len(services)} applications, "
f"and {len(services) * len(CLAIMS)} scope mappings."
)
print("Use --print-json to inspect redacted payloads; use --apply for live Authentik mutation.")
return 0
def apply(services: list[ServiceSpec], args: argparse.Namespace) -> int:
token = os.environ.get("AUTHENTIK_TOKEN")
if not token:
raise ValueError("AUTHENTIK_TOKEN is required with --apply")
if not args.client_secrets_json:
raise ValueError("--client-secrets-json is required with --apply")
secrets = load_client_secrets(args.client_secrets_json)
missing = [service.slug for service in services if not secrets.get(service.slug)]
if missing:
raise ValueError(f"client secret JSON is missing slug(s): {', '.join(missing)}")
client = AuthentikClient(args.base_url, token)
for service in services:
mapping_ids: list[str] = []
for managed in BUILTIN_SCOPE_MAPPINGS:
existing = client.first_result("/propertymappings/provider/scope/", managed=managed)
if not existing:
raise ValueError(f"built-in Authentik scope mapping not found: {managed}")
mapping_ids.append(existing["pk"])
for mapping in scope_mapping_payloads(service):
existing = client.first_result("/propertymappings/provider/scope/", name=mapping["name"])
if existing and not args.update_existing:
mapping_ids.append(existing["pk"])
continue
if existing and args.update_existing:
updated = client.request("PATCH", f"/propertymappings/provider/scope/{existing['pk']}/", mapping)
mapping_ids.append(updated["pk"])
continue
created = client.request("POST", "/propertymappings/provider/scope/", mapping)
mapping_ids.append(created["pk"])
existing_provider = client.first_result("/providers/oauth2/", client_id=service.client_id)
provider_body = provider_payload(service, args, mapping_ids, secrets[service.slug])
if existing_provider and not args.update_existing:
provider_pk = existing_provider["pk"]
elif existing_provider:
updated_provider = client.request("PATCH", f"/providers/oauth2/{existing_provider['pk']}/", provider_body)
provider_pk = updated_provider["pk"]
else:
provider_pk = client.request("POST", "/providers/oauth2/", provider_body)["pk"]
app_body = application_payload(service, provider_pk)
existing_app = client.first_result("/core/applications/", slug=service.slug)
if existing_app and args.update_existing:
client.request("PATCH", f"/core/applications/{existing_app['slug']}/", app_body)
elif not existing_app:
client.request("POST", "/core/applications/", app_body)
print(f"applied {service.slug}: provider/app present, secret redacted")
return 0
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--apply", action="store_true", help="perform live Authentik REST mutations")
parser.add_argument("--update-existing", action="store_true", help="patch existing mappings/providers/applications")
parser.add_argument("--print-json", action="store_true", help="print redacted dry-run payloads")
parser.add_argument("--service", action="append", default=[], help="limit to one service slug; repeatable")
parser.add_argument("--base-url", default="http://localhost:9000", help="Authentik base URL")
parser.add_argument("--client-secrets-json", help="operator-provided JSON object of slug to client_secret")
parser.add_argument("--authorization-flow", default="<authorization-flow-uuid>")
parser.add_argument("--invalidation-flow", default="<invalidation-flow-uuid>")
parser.add_argument("--authentication-flow")
parser.add_argument("--signing-key", help="shared Authentik signing key UUID")
return parser.parse_args()
def main() -> int:
args = parse_args()
try:
services = select_services(args.service)
validate_specs(services)
if args.apply:
return apply(services, args)
return dry_run(services, args)
except Exception as error:
print(f"error: {error}", file=sys.stderr)
return 2
if __name__ == "__main__":
raise SystemExit(main())