Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NAS-130979 / 25.04 / Support legacy API #14405

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 55 additions & 10 deletions src/middlewared/middlewared/api/base/handler/accept.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,70 @@
from pydantic_core import ValidationError

from middlewared.api.base.model import BaseModel
from middlewared.service_exception import CallError, ValidationErrors


def accept_params(model, args, *, exclude_unset=False, expose_secrets=True):
def accept_params(model: type[BaseModel], args: list, *, exclude_unset=False, expose_secrets=True) -> list:
"""
Accepts a list of `args` for a method call and validates it using `model`.
Parameters are accepted in the order they are defined in the `model`.
Returns the list of valid parameters (or raises `ValidationErrors`).
:param model: `BaseModel` that defines method args.
:param args: a list of method args.
:param exclude_unset: if true, will not append default parameters to the list.
:param expose_secrets: if false, will replace `Private` parameters with a placeholder.
:return: a validated list of method args.
"""
args_as_dict = model_dict_from_list(model, args)

dump = validate_model(model, args_as_dict, exclude_unset=exclude_unset, expose_secrets=expose_secrets)

fields = list(model.model_fields)
if exclude_unset:
fields = fields[:len(args)]

return [dump[field] for field in fields]


def model_dict_from_list(model: type[BaseModel], args: list) -> dict:
"""
Converts a list of `args` for a method call to a dictionary using `model`.
Parameters are accepted in the order they are defined in the `model`.
For example, given the model that has fields `b` and `a`, and `args` equal to `[1, 2]`, it will return
`{"b": 1, "a": 2"}`.
:param model: `BaseModel` that defines method args.
:param args: a list of method args.
:return: a dictionary of method args.
"""
if len(args) > len(model.model_fields):
raise CallError(f"Too many arguments (expected {len(model.model_fields)}, found {len(args)})")

args_as_dict = {
return {
field: value
for field, value in zip(model.model_fields.keys(), args)
}


def validate_model(model: type[BaseModel], data: dict, *, exclude_unset=False, expose_secrets=True) -> dict:
"""
Validates `data` against the `model`, sanitizes values, sets defaults.
Raises `ValidationErrors` if any validation errors occur.
:param model: `BaseModel` subclass.
:param data: provided data.
:param exclude_unset: if true, will not add default values.
:param expose_secrets: if false, will replace `Private` fields with a placeholder.
:return: validated data.
"""
try:
instance = model(**args_as_dict)
instance = model(**data)
except ValidationError as e:
verrors = ValidationErrors()
for error in e.errors():
Expand All @@ -26,10 +77,4 @@ def accept_params(model, args, *, exclude_unset=False, expose_secrets=True):
else:
mode = "json"

dump = instance.model_dump(mode=mode, exclude_unset=exclude_unset, warnings=False)

fields = list(model.model_fields)
if exclude_unset:
fields = fields[:len(args)]

return [dump[field] for field in fields]
return instance.model_dump(mode=mode, exclude_unset=exclude_unset, warnings=False)
27 changes: 21 additions & 6 deletions src/middlewared/middlewared/api/base/handler/dump_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,20 @@
from middlewared.api.base import BaseModel, Private, PRIVATE_VALUE
from middlewared.service_exception import ValidationErrors
from .accept import accept_params
from .inspect import model_field_is_model, model_field_is_list_of_models

__all__ = ["dump_params"]


def dump_params(model, args, expose_secrets):
def dump_params(model: type[BaseModel], args: list, expose_secrets: bool) -> list:
"""
Dumps a list of `args` for a method call that accepts `model` parameters.
:param model: `BaseModel` that defines method args.
:param args: a list of method args.
:param expose_secrets: if false, will replace `Private` parameters with a placeholder.
:return: A list of method call arguments ready to be printed.
"""
try:
return accept_params(model, args, exclude_unset=True, expose_secrets=expose_secrets)
except ValidationErrors:
Expand All @@ -19,15 +28,21 @@ def dump_params(model, args, expose_secrets):
]


def remove_secrets(model, value):
if isinstance(model, type) and issubclass(model, BaseModel) and isinstance(value, dict):
def remove_secrets(model: type[BaseModel], value):
"""
Removes `Private` values from a model value.
:param model: `BaseModel` that corresponds to `value`.
:param value: value that potentially contains `Private` data.
:return: `value` with `Private` parameters replaced with a placeholder.
"""
if isinstance(value, dict) and (nested_model := model_field_is_model(model)):
return {
k: remove_secrets(v.annotation, value[k])
for k, v in model.model_fields.items()
for k, v in nested_model.model_fields.items()
if k in value
}
elif typing.get_origin(model) is list and len(args := typing.get_args(model)) == 1 and isinstance(value, list):
return [remove_secrets(args[0], v) for v in value]
elif isinstance(value, list) and (nested_model := model_field_is_list_of_models(model)):
return [remove_secrets(nested_model, v) for v in value]
elif typing.get_origin(model) is Private:
return PRIVATE_VALUE
else:
Expand Down
23 changes: 23 additions & 0 deletions src/middlewared/middlewared/api/base/handler/inspect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import typing

from middlewared.api.base import BaseModel


def model_field_is_model(model) -> type[BaseModel] | None:
"""
Return` model` if it is an API model. Otherwise, returns `None`.
:param model: potentially, API model.
:return: `model` or `None`
"""
if isinstance(model, type) and issubclass(model, BaseModel):
return model


def model_field_is_list_of_models(model) -> type[BaseModel] | None:
"""
If` model` represents a list of API models X, then it will return that model X. Otherwise, returns `None`.
:param model: potentially, a model that represents a list of API models.
:return: nested API model or `None`
"""
if typing.get_origin(model) is list and len(args := typing.get_args(model)) == 1:
return args[0]
172 changes: 172 additions & 0 deletions src/middlewared/middlewared/api/base/handler/version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import enum
from types import ModuleType

from middlewared.api.base import BaseModel, ForUpdateMetaclass
from .accept import validate_model
from .inspect import model_field_is_model, model_field_is_list_of_models


class Direction(enum.StrEnum):
DOWNGRADE = "DOWNGRADE"
UPGRADE = "UPGRADE"


class APIVersionDoesNotExistException(Exception):
def __init__(self, version: str):
self.version = version
super().__init__(f"API Version {self.version!r} does not exist")


class APIVersionDoesNotContainModelException(Exception):
def __init__(self, version: str, model_name: str):
self.version = version
self.model_name = model_name
super().__init__(f"API version {version!r} does not contain model {model_name!r}")


class APIVersion:
def __init__(self, version: str, models: dict[str, type[BaseModel]]):
"""
:param version: API version name
:param models: a dictionary which keys are model names and values are models used in the API version
"""
self.version: str = version
self.models: dict[str, type[BaseModel]] = models

@classmethod
def from_module(cls, version: str, module: ModuleType) -> "APIVersion":
"""
Create `APIVersion` from a module (e.g. `middlewared.api.v25_04_0`).
:param version: API version name
:param module: module object
:return: `APIVersion` instance
"""
return cls(
version,
{
model_name: model
for model_name, model in [
(model_name, getattr(module, model_name))
for model_name in dir(module)
]
if isinstance(model, type) and issubclass(model, BaseModel)
},
)

def __repr__(self):
return f"<APIVersion {self.version}>"


class APIVersionsAdapter:
"""
Converts method parameters and return results between different API versions.
"""

def __init__(self, versions: list[APIVersion]):
"""
:param versions: A chronologically sorted list of API versions.
"""
self.versions: dict[str, APIVersion] = {version.version: version for version in versions}
self.versions_history: list[str] = list(self.versions.keys())
self.current_version: str = self.versions_history[-1]

def adapt(self, value: dict, model_name: str, version1: str, version2: str) -> dict:
"""
Adapts `value` (that matches a model identified by `model_name`) from API `version1` to API `version2`).
:param value: a value to convert
:param model_name: a name of the model. Must exist in all API versions, including intermediate ones, or
`APIVersionDoesNotContainModelException` will be raised.
:param version1: original API version from which the `value` comes from
:param version2: target API version that needs `value`
:return: converted value
"""
try:
version1_index = self.versions_history.index(version1)
except ValueError:
raise APIVersionDoesNotExistException(version1) from None

try:
version2_index = self.versions_history.index(version2)
except ValueError:
raise APIVersionDoesNotExistException(version2) from None

current_version = self.versions[version1]
try:
current_version_model = current_version.models[model_name]
except KeyError:
raise APIVersionDoesNotContainModelException(current_version.version, model_name)
value = validate_model(current_version_model, value)

if version1_index < version2_index:
step = 1
direction = Direction.UPGRADE
else:
step = -1
direction = Direction.DOWNGRADE

for version_index in range(version1_index + step, version2_index + step, step):
new_version = self.versions[self.versions_history[version_index]]

value = self._adapt_model(value, model_name, current_version, new_version, direction)

current_version = new_version

return value

def _adapt_model(self, value: dict, model_name: str, current_version: APIVersion, new_version: APIVersion,
direction: Direction):
try:
current_model = current_version.models[model_name]
except KeyError:
raise APIVersionDoesNotContainModelException(current_version.version, model_name) from None

try:
new_model = new_version.models[model_name]
except KeyError:
raise APIVersionDoesNotContainModelException(new_version.version, model_name) from None

return self._adapt_value(value, current_model, new_model, direction)

def _adapt_value(self, value: dict, current_model: type[BaseModel], new_model: type[BaseModel],
direction: Direction):
for k in value:
if k in current_model.model_fields and k in new_model.model_fields:
current_model_field = current_model.model_fields[k].annotation
new_model_field = new_model.model_fields[k].annotation
if (
isinstance(value[k], dict) and
(current_nested_model := model_field_is_model(current_model_field)) and
(new_nested_model := model_field_is_model(new_model_field)) and
current_nested_model.__class__.__name__ == new_nested_model.__class__.__name__
):
value[k] = self._adapt_value(value[k], current_nested_model, new_nested_model, direction)
elif (
isinstance(value[k], list) and
(current_nested_model := model_field_is_list_of_models(current_model_field)) and
(current_nested_model := model_field_is_model(current_nested_model)) and
(new_nested_model := model_field_is_list_of_models(new_model_field)) and
(new_nested_model := model_field_is_model(new_nested_model)) and
current_nested_model.__class__.__name__ == new_nested_model.__class__.__name__
):
value[k] = [
self._adapt_value(v, current_nested_model, new_nested_model, direction)
for v in value[k]
]

if new_model.__class__ is not ForUpdateMetaclass:
for k, field in new_model.model_fields.items():
if k not in value and not field.is_required():
value[k] = field.get_default()

match direction:
case Direction.DOWNGRADE:
value = current_model.to_previous(value)
case Direction.UPGRADE:
value = new_model.from_previous(value)

for k in list(value):
if k in current_model.model_fields and k not in new_model.model_fields:
value.pop(k)

return value
Loading
Loading