From 5f5adb4e6ef5c61ee40e943a7f2b3e66eab29e03 Mon Sep 17 00:00:00 2001 From: iromli Date: Mon, 6 Jan 2025 05:19:56 +0700 Subject: [PATCH] feat(cloud-native): secure mounted configuration schema Signed-off-by: iromli --- .../jans/pycloudlib/schema/__init__.py | 64 +++++++++++++++---- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/jans-pycloudlib/jans/pycloudlib/schema/__init__.py b/jans-pycloudlib/jans/pycloudlib/schema/__init__.py index 953f12f4f3e..190ccc72aa9 100644 --- a/jans-pycloudlib/jans/pycloudlib/schema/__init__.py +++ b/jans-pycloudlib/jans/pycloudlib/schema/__init__.py @@ -4,6 +4,7 @@ import logging import re from base64 import b64decode +from contextlib import suppress import pem from fqdn import FQDN @@ -20,6 +21,7 @@ from marshmallow.validate import OneOf from marshmallow.validate import Predicate from marshmallow.validate import Range +from sprig_aes import sprig_decrypt_aes logger = logging.getLogger(__name__) @@ -884,38 +886,72 @@ class Meta: _configmap = Nested(ConfigmapSchema, required=True) -def load_schema_from_file(path, exclude_configmap=False, exclude_secret=False): +def load_schema_from_file(path, exclude_configmap=False, exclude_secret=False, key_file=""): """Loads schema from file.""" - out = {} - err = {} - code = 0 + out, err, code = maybe_encrypted_schema(path, key_file) - try: - with open(path) as f: - docs = json.loads(f.read()) - except (IOError, ValueError) as exc: - err = exc - code = 1 + if code != 0: return out, err, code # dont exclude attributes - exclude_attrs = False + exclude_attrs = [] # exclude configmap from loading mechanism if exclude_configmap: key = "_configmap" exclude_attrs = [key] - docs.pop(key, None) + out.pop(key, None) # exclude secret from loading mechanism if exclude_secret: key = "_secret" exclude_attrs = [key] - docs.pop(key, None) + out.pop(key, None) try: - out = ConfigurationSchema().load(docs, partial=exclude_attrs) + out = ConfigurationSchema().load(out, partial=exclude_attrs) except ValidationError as exc: err = exc.messages code = 1 return out, err, code + + +def load_schema_key(path): + try: + with open(path) as f: + key = f.read().strip() + except FileNotFoundError: + key = "" + return key + + +def maybe_encrypted_schema(path, key_file): + out, err, code = {}, {}, 0 + + try: + # read schema as raw string + with open(path) as f: + raw_txt = f.read() + except FileNotFoundError as exc: + err = { + "error": f"Unable to load schema {path}", + "reason": exc, + } + code = exc.errno + else: + if key := load_schema_key(key_file): + # try to decrypt schema (if applicable) + with suppress(ValueError): + raw_txt = sprig_decrypt_aes(raw_txt, key) + + try: + out = json.loads(raw_txt) + except (json.decoder.JSONDecodeError, UnicodeDecodeError) as exc: + err = { + "error": f"Unable to decode JSON from {path}", + "reason": exc, + } + code = 1 + + # finalized results + return out, err, code