Add Authentik OIDC client registration assets
This commit is contained in:
416
scripts/authentik-bulk-client-create.py
Normal file
416
scripts/authentik-bulk-client-create.py
Normal file
@@ -0,0 +1,416 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Generate and optionally apply FlowerCore Authentik OIDC client assets.
|
||||
|
||||
Dry-run is the default. Live Authentik mutations require --apply plus an
|
||||
AUTHENTIK_TOKEN bearer token and an operator-provided client secret JSON file.
|
||||
The script never prints client_secret values.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
|
||||
CLAIMS = (
|
||||
"fc:roles",
|
||||
"fc:tenant",
|
||||
"fc:svc",
|
||||
"fc:scope",
|
||||
"fc:mfa",
|
||||
"flowercore_actor_id",
|
||||
)
|
||||
|
||||
BUILTIN_SCOPE_MAPPINGS = (
|
||||
"goauthentik.io/providers/oauth2/scope-openid",
|
||||
"goauthentik.io/providers/oauth2/scope-profile",
|
||||
"goauthentik.io/providers/oauth2/scope-email",
|
||||
"goauthentik.io/providers/oauth2/scope-offline_access",
|
||||
)
|
||||
|
||||
FLOW_CONTRACT = {
|
||||
"response_type": "code",
|
||||
"grant_types": ["authorization_code", "refresh_token"],
|
||||
"offline_access_required": True,
|
||||
}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ServiceSpec:
|
||||
slug: str
|
||||
namespace: str
|
||||
display_name: str
|
||||
host: str
|
||||
|
||||
@property
|
||||
def client_id(self) -> str:
|
||||
return self.slug
|
||||
|
||||
@property
|
||||
def provider_name(self) -> str:
|
||||
return f"FlowerCore {self.display_name} OIDC"
|
||||
|
||||
@property
|
||||
def application_name(self) -> str:
|
||||
return f"FlowerCore {self.display_name}"
|
||||
|
||||
@property
|
||||
def issuer_url(self) -> str:
|
||||
return f"https://id.iamworkin.lan/application/o/{self.slug}/"
|
||||
|
||||
@property
|
||||
def onepassword_item_path(self) -> str:
|
||||
return f"IAmWorkin/items/{self.slug}-oidc-client"
|
||||
|
||||
|
||||
SERVICE_SPECS = (
|
||||
ServiceSpec("library", "fc-library", "Library", "library.iamworkin.lan"),
|
||||
ServiceSpec("retail", "fc-retail", "Retail", "retail.iamworkin.lan"),
|
||||
ServiceSpec("telephony", "telephony", "Telephony", "telephony.iamworkin.lan"),
|
||||
ServiceSpec("knowledge", "knowledge", "Knowledge", "knowledge.iamworkin.lan"),
|
||||
ServiceSpec("llmbridge", "fc-llm-bridge", "LlmBridge", "fc-llm-bridge.iamworkin.lan"),
|
||||
ServiceSpec("mysql", "fc-mysql", "MySQL", "mysql.iamworkin.lan"),
|
||||
ServiceSpec("php", "fc-php", "PHP", "php.iamworkin.lan"),
|
||||
ServiceSpec("signage", "fc-signage", "Signage", "signage.iamworkin.lan"),
|
||||
ServiceSpec("media", "fc-media", "Media", "media.iamworkin.lan"),
|
||||
ServiceSpec("dms", "fc-dms", "DMS", "dms.iamworkin.lan"),
|
||||
ServiceSpec("pimanager", "fc-pimanager", "PiManager", "pimanager.iamworkin.lan"),
|
||||
ServiceSpec("distribution", "fc-distribution", "Distribution", "distribution.iamworkin.lan"),
|
||||
ServiceSpec("dns", "fc-dns", "DNS", "dns.iamworkin.lan"),
|
||||
ServiceSpec("print", "fc-print", "Print", "print.iamworkin.lan"),
|
||||
ServiceSpec("aistation", "fc-aistation", "AiStation", "aistation.iamworkin.lan"),
|
||||
ServiceSpec("irc", "irc", "IRC", "irc.iamworkin.lan"),
|
||||
ServiceSpec("ttsreader", "fc-ttsreader", "TtsReader", "ttsreader.iamworkin.lan"),
|
||||
ServiceSpec("chat", "fc-chat", "Chat", "chat.iamworkin.lan"),
|
||||
ServiceSpec("intranet", "intranet", "Intranet", "intranet.iamworkin.lan"),
|
||||
ServiceSpec("remotedesktop", "fc-desktop", "RemoteDesktop", "remotedesktop.iamworkin.lan"),
|
||||
ServiceSpec("provisioning", "fc-provisioning", "Provisioning", "provisioning.iamworkin.lan"),
|
||||
ServiceSpec("scoreboards", "fc-scoreboard", "Scoreboards", "scoreboards.iamworkin.lan"),
|
||||
ServiceSpec("mndot", "fc-mndot", "MnDOT", "mndot.iamworkin.lan"),
|
||||
ServiceSpec("kiosk", "fc-system", "Kiosk", "kiosk.iamworkin.lan"),
|
||||
ServiceSpec("mike-bundle", "fc-mike-bundle", "Mike Bundle", "mike-bundle.iamworkin.lan"),
|
||||
ServiceSpec("messageboard", "fc-messageboard", "MessageBoard", "messageboard.iamworkin.lan"),
|
||||
ServiceSpec("menuboard", "fc-menuboard", "MenuBoard", "menuboard.iamworkin.lan"),
|
||||
ServiceSpec("presentations", "fc-presentations", "Presentations", "presentations.iamworkin.lan"),
|
||||
ServiceSpec("segmentdisplay", "fc-segmentdisplay", "SegmentDisplay", "segmentdisplay.iamworkin.lan"),
|
||||
ServiceSpec("signalcontrol", "fc-signalcontrol", "SignalControl", "signalcontrol.iamworkin.lan"),
|
||||
ServiceSpec("worldbuilder", "fc-worldbuilder", "WorldBuilder", "worldbuilder.iamworkin.lan"),
|
||||
ServiceSpec("audit", "fc-audit", "Audit", "audit.iamworkin.lan"),
|
||||
ServiceSpec("licensing", "fc-licensing", "Licensing", "licensing.iamworkin.lan"),
|
||||
)
|
||||
|
||||
|
||||
def scope_mapping_payloads(service: ServiceSpec) -> list[dict[str, str]]:
|
||||
managed_prefix = f"flowercore.io/authentik/oidc/{service.slug}"
|
||||
return [
|
||||
{
|
||||
"managed": f"{managed_prefix}/fc-roles",
|
||||
"name": f"FlowerCore {service.slug} fc:roles",
|
||||
"scope_name": "flowercore",
|
||||
"description": "FlowerCore role claim from Authentik group memberships.",
|
||||
"expression": (
|
||||
"groups = [group.name for group in request.user.ak_groups.all()]\n"
|
||||
"return {'fc:roles': ','.join(groups)}"
|
||||
),
|
||||
},
|
||||
{
|
||||
"managed": f"{managed_prefix}/fc-tenant",
|
||||
"name": f"FlowerCore {service.slug} fc:tenant",
|
||||
"scope_name": "flowercore",
|
||||
"description": "FlowerCore tenant claim from group attribute fc_tenant_id.",
|
||||
"expression": (
|
||||
"for group in request.user.ak_groups.all():\n"
|
||||
" tenant_id = group.attributes.get('fc_tenant_id')\n"
|
||||
" if tenant_id:\n"
|
||||
" return {'fc:tenant': tenant_id}\n"
|
||||
"return {'fc:tenant': 'default'}"
|
||||
),
|
||||
},
|
||||
{
|
||||
"managed": f"{managed_prefix}/fc-svc",
|
||||
"name": f"FlowerCore {service.slug} fc:svc",
|
||||
"scope_name": "flowercore",
|
||||
"description": "FlowerCore service slug claim.",
|
||||
"expression": f"return {{'fc:svc': '{service.slug}'}}",
|
||||
},
|
||||
{
|
||||
"managed": f"{managed_prefix}/fc-scope",
|
||||
"name": f"FlowerCore {service.slug} fc:scope",
|
||||
"scope_name": "flowercore",
|
||||
"description": "FlowerCore service permission scope claim.",
|
||||
"expression": f"return {{'fc:scope': 'flowercore:{service.slug}'}}",
|
||||
},
|
||||
{
|
||||
"managed": f"{managed_prefix}/fc-mfa",
|
||||
"name": f"FlowerCore {service.slug} fc:mfa",
|
||||
"scope_name": "flowercore",
|
||||
"description": "FlowerCore MFA satisfied session claim.",
|
||||
"expression": (
|
||||
"mfa_stage = request.session.get('authentik/stages/authenticator_validate')\n"
|
||||
"return {'fc:mfa': bool(mfa_stage)}"
|
||||
),
|
||||
},
|
||||
{
|
||||
"managed": f"{managed_prefix}/flowercore-actor-id",
|
||||
"name": f"FlowerCore {service.slug} flowercore_actor_id",
|
||||
"scope_name": "flowercore",
|
||||
"description": "FlowerCore audit actor alias for the Authentik user id.",
|
||||
"expression": "return {'flowercore_actor_id': str(request.user.uid)}",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def provider_payload(
|
||||
service: ServiceSpec,
|
||||
args: argparse.Namespace,
|
||||
mapping_ids: list[str],
|
||||
client_secret: str,
|
||||
) -> dict[str, Any]:
|
||||
payload: dict[str, Any] = {
|
||||
"name": service.provider_name,
|
||||
"authorization_flow": args.authorization_flow,
|
||||
"invalidation_flow": args.invalidation_flow,
|
||||
"property_mappings": mapping_ids,
|
||||
"client_type": "confidential",
|
||||
"client_id": service.client_id,
|
||||
"client_secret": client_secret,
|
||||
"access_code_validity": "minutes=1",
|
||||
"access_token_validity": "hours=1",
|
||||
"refresh_token_validity": "days=30",
|
||||
"include_claims_in_id_token": True,
|
||||
"redirect_uris": [
|
||||
{"matching_mode": "strict", "url": f"https://{service.host}/signin-oidc"},
|
||||
{"matching_mode": "strict", "url": f"https://{service.host}/signout-callback-oidc"},
|
||||
],
|
||||
"sub_mode": "hashed_user_id",
|
||||
"issuer_mode": "per_provider",
|
||||
}
|
||||
|
||||
if args.authentication_flow:
|
||||
payload["authentication_flow"] = args.authentication_flow
|
||||
if args.signing_key:
|
||||
payload["signing_key"] = args.signing_key
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
def application_payload(service: ServiceSpec, provider_pk: int | str | None) -> dict[str, Any]:
|
||||
return {
|
||||
"name": service.application_name,
|
||||
"slug": service.slug,
|
||||
"provider": provider_pk,
|
||||
"open_in_new_tab": True,
|
||||
"meta_launch_url": f"https://{service.host}/",
|
||||
"meta_description": f"FlowerCore {service.display_name} OIDC client",
|
||||
"meta_publisher": "FlowerCore",
|
||||
"policy_engine_mode": "all",
|
||||
"group": "FlowerCore",
|
||||
}
|
||||
|
||||
|
||||
def load_client_secrets(path: str | None) -> dict[str, str]:
|
||||
if not path:
|
||||
return {}
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
data = json.load(handle)
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("client secret file must be a JSON object keyed by service slug")
|
||||
return {str(key): str(value) for key, value in data.items()}
|
||||
|
||||
|
||||
def redact(value: Any) -> Any:
|
||||
if isinstance(value, dict):
|
||||
return {
|
||||
key: "<redacted>" if key == "client_secret" else redact(child)
|
||||
for key, child in value.items()
|
||||
}
|
||||
if isinstance(value, list):
|
||||
return [redact(child) for child in value]
|
||||
return value
|
||||
|
||||
|
||||
class AuthentikClient:
|
||||
def __init__(self, base_url: str, token: str) -> None:
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.token = token
|
||||
|
||||
def request(self, method: str, path: str, payload: dict[str, Any] | None = None) -> Any:
|
||||
url = f"{self.base_url}/api/v3/{path.lstrip('/')}"
|
||||
body = None
|
||||
headers = {
|
||||
"Accept": "application/json",
|
||||
"Authorization": f"Bearer {self.token}",
|
||||
}
|
||||
if payload is not None:
|
||||
headers["Content-Type"] = "application/json"
|
||||
body = json.dumps(payload).encode("utf-8")
|
||||
|
||||
request = urllib.request.Request(url, data=body, headers=headers, method=method)
|
||||
try:
|
||||
with urllib.request.urlopen(request, timeout=30) as response:
|
||||
text = response.read().decode("utf-8")
|
||||
except urllib.error.HTTPError as error:
|
||||
error_text = error.read().decode("utf-8", errors="replace")
|
||||
raise RuntimeError(f"{method} {path} failed with HTTP {error.code}: {error_text}") from error
|
||||
|
||||
if not text:
|
||||
return None
|
||||
return json.loads(text)
|
||||
|
||||
def first_result(self, path: str, **query: str) -> dict[str, Any] | None:
|
||||
query_string = urllib.parse.urlencode(query)
|
||||
response = self.request("GET", f"{path}?{query_string}")
|
||||
results = response.get("results", []) if isinstance(response, dict) else []
|
||||
return results[0] if results else None
|
||||
|
||||
|
||||
def select_services(slugs: list[str]) -> list[ServiceSpec]:
|
||||
if not slugs:
|
||||
return list(SERVICE_SPECS)
|
||||
|
||||
by_slug = {service.slug: service for service in SERVICE_SPECS}
|
||||
unknown = sorted(set(slugs) - set(by_slug))
|
||||
if unknown:
|
||||
raise ValueError(f"unknown service slug(s): {', '.join(unknown)}")
|
||||
return [by_slug[slug] for slug in slugs]
|
||||
|
||||
|
||||
def validate_specs(services: list[ServiceSpec]) -> None:
|
||||
slugs = [service.slug for service in services]
|
||||
if len(slugs) != len(set(slugs)):
|
||||
raise ValueError("duplicate service slug in OIDC roster")
|
||||
for service in services:
|
||||
if not service.namespace:
|
||||
raise ValueError(f"{service.slug} is missing a target namespace")
|
||||
expressions = "\n".join(mapping["expression"] for mapping in scope_mapping_payloads(service))
|
||||
missing_claims = [claim for claim in CLAIMS if claim not in expressions]
|
||||
if missing_claims:
|
||||
raise ValueError(f"{service.slug} mapping payloads miss claims: {', '.join(missing_claims)}")
|
||||
|
||||
|
||||
def dry_run(services: list[ServiceSpec], args: argparse.Namespace) -> int:
|
||||
placeholder_ids = [
|
||||
*[f"<builtin-{managed.rsplit('/', 1)[-1]}-pk>" for managed in BUILTIN_SCOPE_MAPPINGS],
|
||||
*[f"<{claim}-mapping-pk>" for claim in CLAIMS],
|
||||
]
|
||||
documents = []
|
||||
for service in services:
|
||||
provider = provider_payload(service, args, placeholder_ids, "<from-1password-client_secret>")
|
||||
documents.append(
|
||||
{
|
||||
"service": service.slug,
|
||||
"namespace": service.namespace,
|
||||
"onepassword_item": service.onepassword_item_path,
|
||||
"issuer_url": service.issuer_url,
|
||||
"flow_contract": FLOW_CONTRACT,
|
||||
"builtin_scope_mappings": list(BUILTIN_SCOPE_MAPPINGS),
|
||||
"scope_mappings": scope_mapping_payloads(service),
|
||||
"provider": redact(provider),
|
||||
"application": application_payload(service, "<provider-pk>"),
|
||||
}
|
||||
)
|
||||
|
||||
if args.print_json:
|
||||
print(json.dumps(documents, indent=2, sort_keys=True))
|
||||
else:
|
||||
print(
|
||||
"Dry-run only: generated "
|
||||
f"{len(services)} providers, {len(services)} applications, "
|
||||
f"and {len(services) * len(CLAIMS)} scope mappings."
|
||||
)
|
||||
print("Use --print-json to inspect redacted payloads; use --apply for live Authentik mutation.")
|
||||
return 0
|
||||
|
||||
|
||||
def apply(services: list[ServiceSpec], args: argparse.Namespace) -> int:
|
||||
token = os.environ.get("AUTHENTIK_TOKEN")
|
||||
if not token:
|
||||
raise ValueError("AUTHENTIK_TOKEN is required with --apply")
|
||||
if not args.client_secrets_json:
|
||||
raise ValueError("--client-secrets-json is required with --apply")
|
||||
|
||||
secrets = load_client_secrets(args.client_secrets_json)
|
||||
missing = [service.slug for service in services if not secrets.get(service.slug)]
|
||||
if missing:
|
||||
raise ValueError(f"client secret JSON is missing slug(s): {', '.join(missing)}")
|
||||
|
||||
client = AuthentikClient(args.base_url, token)
|
||||
for service in services:
|
||||
mapping_ids: list[str] = []
|
||||
for managed in BUILTIN_SCOPE_MAPPINGS:
|
||||
existing = client.first_result("/propertymappings/provider/scope/", managed=managed)
|
||||
if not existing:
|
||||
raise ValueError(f"built-in Authentik scope mapping not found: {managed}")
|
||||
mapping_ids.append(existing["pk"])
|
||||
|
||||
for mapping in scope_mapping_payloads(service):
|
||||
existing = client.first_result("/propertymappings/provider/scope/", name=mapping["name"])
|
||||
if existing and not args.update_existing:
|
||||
mapping_ids.append(existing["pk"])
|
||||
continue
|
||||
if existing and args.update_existing:
|
||||
updated = client.request("PATCH", f"/propertymappings/provider/scope/{existing['pk']}/", mapping)
|
||||
mapping_ids.append(updated["pk"])
|
||||
continue
|
||||
created = client.request("POST", "/propertymappings/provider/scope/", mapping)
|
||||
mapping_ids.append(created["pk"])
|
||||
|
||||
existing_provider = client.first_result("/providers/oauth2/", client_id=service.client_id)
|
||||
provider_body = provider_payload(service, args, mapping_ids, secrets[service.slug])
|
||||
if existing_provider and not args.update_existing:
|
||||
provider_pk = existing_provider["pk"]
|
||||
elif existing_provider:
|
||||
updated_provider = client.request("PATCH", f"/providers/oauth2/{existing_provider['pk']}/", provider_body)
|
||||
provider_pk = updated_provider["pk"]
|
||||
else:
|
||||
provider_pk = client.request("POST", "/providers/oauth2/", provider_body)["pk"]
|
||||
|
||||
app_body = application_payload(service, provider_pk)
|
||||
existing_app = client.first_result("/core/applications/", slug=service.slug)
|
||||
if existing_app and args.update_existing:
|
||||
client.request("PATCH", f"/core/applications/{existing_app['slug']}/", app_body)
|
||||
elif not existing_app:
|
||||
client.request("POST", "/core/applications/", app_body)
|
||||
|
||||
print(f"applied {service.slug}: provider/app present, secret redacted")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--apply", action="store_true", help="perform live Authentik REST mutations")
|
||||
parser.add_argument("--update-existing", action="store_true", help="patch existing mappings/providers/applications")
|
||||
parser.add_argument("--print-json", action="store_true", help="print redacted dry-run payloads")
|
||||
parser.add_argument("--service", action="append", default=[], help="limit to one service slug; repeatable")
|
||||
parser.add_argument("--base-url", default="http://localhost:9000", help="Authentik base URL")
|
||||
parser.add_argument("--client-secrets-json", help="operator-provided JSON object of slug to client_secret")
|
||||
parser.add_argument("--authorization-flow", default="<authorization-flow-uuid>")
|
||||
parser.add_argument("--invalidation-flow", default="<invalidation-flow-uuid>")
|
||||
parser.add_argument("--authentication-flow")
|
||||
parser.add_argument("--signing-key", help="shared Authentik signing key UUID")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
try:
|
||||
services = select_services(args.service)
|
||||
validate_specs(services)
|
||||
if args.apply:
|
||||
return apply(services, args)
|
||||
return dry_run(services, args)
|
||||
except Exception as error:
|
||||
print(f"error: {error}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user