Skip to content

Commit

Permalink
Add Constrained container schema validator
Browse files Browse the repository at this point in the history
  • Loading branch information
Gregory Starck committed Jun 10, 2018
1 parent 0dc8499 commit 37ee730
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 17 deletions.
44 changes: 44 additions & 0 deletions schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,3 +393,47 @@ def _callable_str(callable_):
if hasattr(callable_, '__name__'):
return callable_.__name__
return str(callable_)


class ConstrainedContainer(object):
"""A simple "constrained" container schema validator"""

def __init__(self, container_def, min_items=None, max_items=None, additionals=None):
assert min_items is None or isinstance(min_items, int)
assert max_items is None or isinstance(max_items, int)
self.container_def = container_def
self.min_items = min_items
self.max_items = max_items
self.additionals = additionals

def validate(self, data):
if not isinstance(data, type(self.container_def)):
raise SchemaUnexpectedTypeError("not good type: %s" % type(data))
ld = len(data)
lcd = len(self.container_def)
if ld < lcd:
raise SchemaError("not enough items: %s" % ld)
if self.additionals is None:
if ld > lcd:
raise SchemaError("too many items: %s" % ld)
else:
min_i = self.min_items
if min_i is not None:
if ld < min_i:
raise SchemaError("not enough additionals items: %s" % ld)
max_i = self.max_items
if max_i is not None:
if ld > max_i:
raise SchemaError("too many additionals items: %s" % ld)

def gen_schemas():
for item in self.container_def:
yield Schema(item)
s = Schema(self.additionals)
for _ in range(ld - lcd):
yield s

return type(self.container_def)(
schema.validate(data[idx])
for idx, schema in enumerate(gen_schemas())
)
104 changes: 87 additions & 17 deletions test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
from schema import (Schema, Use, And, Or, Regex, Optional, Const,
SchemaError, SchemaWrongKeyError,
SchemaMissingKeyError, SchemaUnexpectedTypeError,
SchemaForbiddenKeyError, Forbidden)
SchemaForbiddenKeyError, Forbidden, ConstrainedContainer)

if sys.version_info[0] == 3:
basestring = str # Python 3 does not have basestring
unicode = str # Python 3 does not have unicode


SE = raises(SchemaError)


Expand All @@ -32,7 +31,6 @@ def se(_):


def test_schema():

assert Schema(1).validate(1) == 1
with SE: Schema(1).validate(9)

Expand All @@ -55,7 +53,7 @@ def test_schema():

def test_validate_file():
assert Schema(
Use(open)).validate('LICENSE-MIT').read().startswith('Copyright')
Use(open)).validate('LICENSE-MIT').read().startswith('Copyright')
with SE: Schema(Use(open)).validate('NON-EXISTENT')
assert Schema(os.path.exists).validate('.') == '.'
with SE: Schema(os.path.exists).validate('./non-existent/')
Expand Down Expand Up @@ -161,12 +159,15 @@ def test_strictly():

def test_dict():
assert Schema({'key': 5}).validate({'key': 5}) == {'key': 5}
with SE: Schema({'key': 5}).validate({'key': 'x'})
with SE: Schema({'key': 5}).validate(['key', 5])
with SE:
Schema({'key': 5}).validate({'key': 'x'})
with SE:
Schema({'key': 5}).validate(['key', 5])
assert Schema({'key': int}).validate({'key': 5}) == {'key': 5}
assert Schema({'n': int, 'f': float}).validate(
{'n': 5, 'f': 3.14}) == {'n': 5, 'f': 3.14}
with SE: Schema({'n': int, 'f': float}).validate(
{'n': 5, 'f': 3.14}) == {'n': 5, 'f': 3.14}
with SE:
Schema({'n': int, 'f': float}).validate(
{'n': 3.14, 'f': 5})
with SE:
try:
Expand Down Expand Up @@ -216,18 +217,18 @@ def test_dict():

def test_dict_keys():
assert Schema({str: int}).validate(
{'a': 1, 'b': 2}) == {'a': 1, 'b': 2}
{'a': 1, 'b': 2}) == {'a': 1, 'b': 2}
with SE: Schema({str: int}).validate({1: 1, 'b': 2})
assert Schema({Use(str): Use(int)}).validate(
{1: 3.14, 3.14: 1}) == {'1': 3, '3.14': 1}
{1: 3.14, 3.14: 1}) == {'1': 3, '3.14': 1}


def test_ignore_extra_keys():
assert Schema({'key': 5}, ignore_extra_keys=True).validate(
{'key': 5, 'bad': 4}) == {'key': 5}
{'key': 5, 'bad': 4}) == {'key': 5}
assert Schema({'key': 5, 'dk': {'a': 'a'}}, ignore_extra_keys=True).validate(
{'key': 5, 'bad': 'b', 'dk': {'a': 'a', 'bad': 'b'}}) == \
{'key': 5, 'dk': {'a': 'a'}}
{'key': 5, 'dk': {'a': 'a'}}
assert Schema([{'key': 'v'}], ignore_extra_keys=True).validate(
[{'key': 'v', 'bad': 'bad'}]) == [{'key': 'v'}]
assert Schema([{'key': 'v'}], ignore_extra_keys=True).validate(
Expand All @@ -236,11 +237,11 @@ def test_ignore_extra_keys():

def test_ignore_extra_keys_validation_and_return_keys():
assert Schema({'key': 5, object: object}, ignore_extra_keys=True).validate(
{'key': 5, 'bad': 4}) == {'key': 5, 'bad': 4}
{'key': 5, 'bad': 4}) == {'key': 5, 'bad': 4}
assert Schema({'key': 5, 'dk': {'a': 'a', object: object}},
ignore_extra_keys=True).validate(
{'key': 5, 'dk': {'a': 'a', 'bad': 'b'}}) == \
{'key': 5, 'dk': {'a': 'a', 'bad': 'b'}}
{'key': 5, 'dk': {'a': 'a', 'bad': 'b'}}


def test_dict_forbidden_keys():
Expand All @@ -249,7 +250,7 @@ def test_dict_forbidden_keys():
with raises(SchemaWrongKeyError):
Schema({Forbidden('b'): int}).validate({'b': 'bye'})
assert (Schema({Forbidden('b'): int,
Optional('b'): object}).validate({'b': 'bye'}) ==
Optional('b'): object}).validate({'b': 'bye'}) ==
{'b': 'bye'})
with raises(SchemaForbiddenKeyError):
Schema({Forbidden('b'): object, Optional('b'): object}).validate({'b': 'bye'})
Expand All @@ -259,7 +260,7 @@ def test_dict_optional_keys():
with SE: Schema({'a': 1, 'b': 2}).validate({'a': 1})
assert Schema({'a': 1, Optional('b'): 2}).validate({'a': 1}) == {'a': 1}
assert Schema({'a': 1, Optional('b'): 2}).validate(
{'a': 1, 'b': 2}) == {'a': 1, 'b': 2}
{'a': 1, 'b': 2}) == {'a': 1, 'b': 2}
# Make sure Optionals are favored over types:
assert Schema({basestring: 1,
Optional('b'): 2}).validate({'a': 1, 'b': 2}) == {'a': 1, 'b': 2}
Expand Down Expand Up @@ -499,7 +500,8 @@ def test_issue_9_prioritized_key_comparison_in_dicts():

def test_missing_keys_exception_with_non_str_dict_keys():
s = Schema({And(str, Use(str.lower), 'name'): And(str, len)})
with SE: s.validate(dict())
with SE:
s.validate(dict())
with SE:
try:
Schema({1: 'x'}).validate(dict())
Expand Down Expand Up @@ -596,3 +598,71 @@ def validate(self, data):
v = {'k': 1, 'd': {'k': 2, 'l': [{'l': [3, 4, 5]}]}}
d = MySchema(s).validate(v)
assert d['k'] == 2 and d['d']['k'] == 3 and d['d']['l'][0]['l'] == [4, 5, 6]


class TestConstrainedContainer(object):
CC = ConstrainedContainer

def test_one(self):
cc = self.CC([int])
assert cc.validate([1]) == [1]
with SE:
cc.validate("bad type")
with SE:
cc.validate((1,))
with SE:
cc.validate([])
with SE:
cc.validate([1, 1])
with SE:
cc.validate(["foo"])

def test_two(self):
cc = self.CC([int, str])
assert cc.validate([1, "bar"]) == [1, "bar"]
with SE:
cc.validate([1])
with SE:
cc.validate(["bar", 1])
with SE:
cc.validate([1, "foo", 3])

def test_additionals(self):
cc = self.CC([], additionals=float, min_items=2)
for x in 1, 3:
assert cc.validate(x * [1.1, 2.2]) == x * [1.1, 2.2]
with SE:
assert cc.validate([1.1])
cc = self.CC([], additionals=Or(int, str), max_items=4)
assert cc.validate([]) == []
for x in range(4):
assert cc.validate(x * [1]) == x * [1]
with SE:
cc.validate(5 * [1])
for x in range(2):
assert cc.validate(x * [1, 'b']) == x * [1, 'b']
with SE:
cc.validate(3 * [1, 'b'])

def test_multiple(self):
cc = self.CC([int, self.CC([int, int])], additionals=float)
assert cc.validate([1, [2, 3]]) == [1, [2, 3]]
with SE:
cc.validate([1, 2, 3])
assert cc.validate([1, [2, 3], 4.5]) == [1, [2, 3], 4.5]
assert cc.validate([1, [2, 3], 4.5, 6.7]) == [1, [2, 3], 4.5, 6.7]
with SE:
cc.validate([1, [2, 3], 4.5, "bad"])

def test_complex(self):
s = Schema({
'list': self.CC(2 * [int]),
Regex('^(map|array)$'): self.CC(2 * [self.CC(2 * [int])])
})
assert s.validate({
'list': [2, 2],
'array': [[1, 2], [3, 4]]
}) == {
'list': [2, 2],
'array': [[1, 2], [3, 4]]
}

0 comments on commit 37ee730

Please sign in to comment.