Skip to content

Commit

Permalink
feat(cloud-native): secure mounted configuration schema
Browse files Browse the repository at this point in the history
Signed-off-by: iromli <[email protected]>
  • Loading branch information
iromli committed Jan 5, 2025
1 parent 1951312 commit 5f5adb4
Showing 1 changed file with 50 additions and 14 deletions.
64 changes: 50 additions & 14 deletions jans-pycloudlib/jans/pycloudlib/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import re
from base64 import b64decode
from contextlib import suppress

import pem
from fqdn import FQDN
Expand All @@ -20,6 +21,7 @@
from marshmallow.validate import OneOf
from marshmallow.validate import Predicate
from marshmallow.validate import Range
from sprig_aes import sprig_decrypt_aes

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -884,38 +886,72 @@ class Meta:
_configmap = Nested(ConfigmapSchema, required=True)


def load_schema_from_file(path, exclude_configmap=False, exclude_secret=False):
def load_schema_from_file(path, exclude_configmap=False, exclude_secret=False, key_file=""):
"""Loads schema from file."""
out = {}
err = {}
code = 0
out, err, code = maybe_encrypted_schema(path, key_file)

try:
with open(path) as f:
docs = json.loads(f.read())
except (IOError, ValueError) as exc:
err = exc
code = 1
if code != 0:
return out, err, code

# dont exclude attributes
exclude_attrs = False
exclude_attrs = []

# exclude configmap from loading mechanism
if exclude_configmap:
key = "_configmap"
exclude_attrs = [key]
docs.pop(key, None)
out.pop(key, None)

# exclude secret from loading mechanism
if exclude_secret:
key = "_secret"
exclude_attrs = [key]
docs.pop(key, None)
out.pop(key, None)

try:
out = ConfigurationSchema().load(docs, partial=exclude_attrs)
out = ConfigurationSchema().load(out, partial=exclude_attrs)
except ValidationError as exc:
err = exc.messages
code = 1
return out, err, code


def load_schema_key(path):
try:
with open(path) as f:
key = f.read().strip()
except FileNotFoundError:
key = ""
return key


def maybe_encrypted_schema(path, key_file):
out, err, code = {}, {}, 0

try:
# read schema as raw string
with open(path) as f:
raw_txt = f.read()
except FileNotFoundError as exc:
err = {
"error": f"Unable to load schema {path}",
"reason": exc,
}
code = exc.errno
else:
if key := load_schema_key(key_file):
# try to decrypt schema (if applicable)
with suppress(ValueError):
raw_txt = sprig_decrypt_aes(raw_txt, key)

try:
out = json.loads(raw_txt)
except (json.decoder.JSONDecodeError, UnicodeDecodeError) as exc:
err = {
"error": f"Unable to decode JSON from {path}",
"reason": exc,
}
code = 1

# finalized results
return out, err, code

0 comments on commit 5f5adb4

Please sign in to comment.