Skip to content

Commit

Permalink
Re-work apiv2 test framework
Browse files Browse the repository at this point in the history
- Make calls more generic, allowing easier writing of test cases.
- Improve error handing to yield more meaningfull error messages
  • Loading branch information
rixvet committed Aug 16, 2023
1 parent dcb68c4 commit 95777bb
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 325 deletions.
68 changes: 57 additions & 11 deletions ci/apiv2/hashtopolis.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ def print_to_log(*args): # noqa:E301


class HashtopolisError(Exception):
pass
def __init__(self, *args, **kwargs):
super().__init__(*args)
self.exception_details = kwargs.get('exception_details', [])
self.message = kwargs.get('message', '')
self.status_code = kwargs.get('status_code', None)


class HashtopolisConfig(object):
Expand Down Expand Up @@ -91,9 +95,22 @@ def authenticate(self):
}

def validate_status_code(self, r, expected_status_code, error_msg):
if r.status_code != expected_status_code:
raise HashtopolisError("%s (status_code: %s != %s): %s", error_msg, r.status_code,
expected_status_code, r.text)
""" Validate response and convert to python exception """
# Status code 204 is special and should have no JSON output
if r.status_code == 204:
assert (r.text == '')
return

# Expected responses below should be valid JSON
r_json = self.resp_to_json(r)

# Application hits a problem
if r.status_code not in expected_status_code:
raise HashtopolisError(
"%s (status_code=%s): %s" % (error_msg, r.status_code, r.text),
status_code=r.status_code,
exception_details=r_json.get('exception', []),
message=r_json.get('message', None))

def filter(self, expand, filter):
self.authenticate()
Expand Down Expand Up @@ -125,7 +142,7 @@ def filter(self, expand, filter):
}

r = requests.get(uri, headers=headers, data=json.dumps(payload))
self.validate_status_code(r, 200, "Filtering failed")
self.validate_status_code(r, [200], "Filtering failed")
return self.resp_to_json(r).get('values')

def get_one(self, pk, expand):
Expand All @@ -138,7 +155,7 @@ def get_one(self, pk, expand):
}

r = requests.get(uri, headers=headers, data=json.dumps(payload))
self.validate_status_code(r, 200, "Filtering failed")
self.validate_status_code(r, [200], "Get single object failed")
return self.resp_to_json(r)

def patch_one(self, obj):
Expand All @@ -157,7 +174,7 @@ def patch_one(self, obj):

logger.debug("Sending PATCH payload: %s to %s", json.dumps(payload), uri)
r = requests.patch(uri, headers=headers, data=json.dumps(payload))
self.validate_status_code(r, 201, "Patching failed")
self.validate_status_code(r, [201], "Patching failed")

# TODO: Validate if return objects matches digital twin
obj.set_initial(self.resp_to_json(r).copy())
Expand All @@ -172,7 +189,7 @@ def create(self, obj):
payload = obj.get_fields()

r = requests.post(uri, headers=headers, data=json.dumps(payload))
self.validate_status_code(r, 201, "Creation of object failed")
self.validate_status_code(r, [201], "Creation of object failed")

# TODO: Validate if return objects matches digital twin
obj.set_initial(self.resp_to_json(r).copy())
Expand All @@ -188,7 +205,7 @@ def delete(self, obj):
payload = {}

r = requests.delete(uri, headers=headers, data=json.dumps(payload))
self.validate_status_code(r, 204, "Deletion of object failed")
self.validate_status_code(r, [204], "Deletion of object failed")

# TODO: Cleanup object to allow re-creation

Expand Down Expand Up @@ -239,12 +256,22 @@ def get_first(cls):
@classmethod
def get(cls, expand=None, **kwargs):
if 'pk' in kwargs:
api_obj = cls.get_conn().get_one(kwargs['pk'], expand)
try:
api_obj = cls.get_conn().get_one(kwargs['pk'], expand)
except HashtopolisError as e:
if e.status_code == 404:
raise cls._model.DoesNotExist
else:
# Re-raise error if generic failure took place
raise
new_obj = cls._model(**api_obj)
return new_obj
else:
objs = cls.filter(expand, **kwargs)
assert len(objs) == 1
if len(objs) == 0:
raise cls._model.DoesNotExist
elif len(objs) > 1:
raise cls._model.MultipleObjectsReturned
return objs[0]

@classmethod
Expand All @@ -261,6 +288,14 @@ def filter(cls, expand=None, **kwargs):
return objs


class ObjectDoesNotExist(Exception):
"""The requested object does not exist"""


class MultipleObjectsReturned(Exception):
"""The query returned multiple objects when only one was expected."""


# Build Django ORM style 'ModelName.objects' interface
class ModelBase(type):
def __new__(cls, clsname, bases, attrs, uri=None, **kwargs):
Expand All @@ -272,6 +307,17 @@ def __new__(cls, clsname, bases, attrs, uri=None, **kwargs):

setattr(new_class, 'objects', type('Manager', (ManagerBase,), {'_model_uri': uri}))
setattr(new_class.objects, '_model', new_class)

def add_to_class(class_name, class_type):
setattr(new_class,
class_name,
type(class_name, (class_type,), {
"__qualname__": "%s.%s" % (new_class.__qualname__, class_name),
'__module__': "%s.%s" % (__name__, new_class.__name__)
}))
add_to_class('DoesNotExist', ObjectDoesNotExist)
add_to_class('MultipleObjectsReturned', MultipleObjectsReturned)

cls_registry[clsname] = new_class

# Insert Meta properties
Expand Down
41 changes: 18 additions & 23 deletions ci/apiv2/test_agent.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,28 @@
import datetime

from hashtopolis import Agent

from utils import BaseTest
from utils import do_create_agent, do_create_hashlist, do_create_task
from utils import do_create_agent


class AgentTest(BaseTest):
def test_new_agent(self):
dummy_agent, agent = do_create_agent()
self.delete_after_test(agent)

hashlist = do_create_hashlist()
self.delete_after_test(hashlist)
model_class = Agent

task = do_create_task(hashlist)
self.delete_after_test(task)
def create_test_object(self, delete=True):
_, model_obj = do_create_agent()
if delete:
self.delete_after_test(model_obj)
return model_obj

dummy_agent.get_task()
dummy_agent.get_hashlist()
dummy_agent.get_chunk()
def test_create_agent(self):
model_obj = self.create_test_object()
self._test_create(model_obj)

def test_patch_agent(self):
_, agent = do_create_agent()
self.delete_after_test(agent)

new_name = f'agent-patch-{datetime.datetime.now().isoformat()}'
agent.agentName = new_name
agent.save()

obj = Agent.objects.get(pk=agent.id)
self.assertEqual(obj.agentName, new_name)
model_obj = self.create_test_object()
attr = 'agentName'
self._test_patch(model_obj, attr)

def test_expandables_agent(self):
model_obj = self.create_test_object()
expandables = ['agentstats']
self._test_expandables(model_obj, expandables)
4 changes: 2 additions & 2 deletions ci/apiv2/test_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def test_create_protected(self):
)
with self.assertRaises(HashtopolisError) as e:
user.save()
self.assertEqual(e.exception.args[1], 'Creation of object failed')
self.assertIn('is not valid input key', e.exception.args[4])
self.assertEqual(e.exception.status_code, 500)
self.assertIn('is not valid input key', e.exception.exception_details[0]['message'])

def test_get_private(self):
stamp = int(time.time() * 1000)
Expand Down
51 changes: 20 additions & 31 deletions ci/apiv2/test_cracker.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,34 @@
import datetime

from hashtopolis import Cracker
from hashtopolis import HashtopolisError
from utils import BaseTest
from utils import do_create_cracker


class CrackerTest(BaseTest):
def test_create_cracker(self):
cracker = do_create_cracker()
self.delete_after_test(cracker)
model_class = Cracker

obj = Cracker.objects.get(crackerBinaryId=cracker.id)
self.assertEqual(obj.binaryName, cracker.binaryName)
def create_test_object(self, delete=True):
model_obj = do_create_cracker()
if delete:
self.delete_after_test(model_obj)
return model_obj

def test_patch_cracker(self):
cracker = do_create_cracker()
self.delete_after_test(cracker)

stamp = datetime.datetime.now().isoformat()
obj_name = f'Dummy Cracker - {stamp}'
cracker.binaryName = obj_name
cracker.save()
def test_create_cracker(self):
model_obj = self.create_test_object()
self._test_create(model_obj)

obj = cracker.objects.get(crackerBinaryId=cracker.id)
self.assertEqual(obj.binaryName, obj_name)
def test_patch_cracker(self):
model_obj = self.create_test_object()
attr = 'binaryName'
self._test_patch(model_obj, attr)

def test_delete_cracker(self):
cracker = do_create_cracker()

obj = cracker.objects.get(crackerBinaryId=cracker.id)
self.assertIsNotNone(obj)

cracker.delete()

objs = Cracker.objects.filter(crackerBinaryId=cracker.id)
self.assertEqual(objs, [])
model_obj = self.create_test_object(delete=False)
self._test_delete(model_obj)

def test_exception_cracker(self):
with self.assertRaises(HashtopolisError) as e:
_ = do_create_cracker('002')
self.assertEqual(e.exception.args[1], 'Creation of object failed')
self.assertIn('Required parameter', e.exception.args[4])
self._test_exception(do_create_cracker, '002')

def test_expandables_cracker(self):
model_obj = self.create_test_object()
expandables = ['crackerBinaryType']
self._test_expandables(model_obj, expandables)
69 changes: 22 additions & 47 deletions ci/apiv2/test_crackertype.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,35 @@
import json
import datetime
from pathlib import Path

from hashtopolis import CrackerType
from hashtopolis import HashtopolisError
from utils import BaseTest
from utils import do_create_crackertype


class CrackerTypeTest(BaseTest):
def test_create_crackertype(self):
p = Path(__file__).parent.joinpath('create_crackertype_001.json')
payload = json.loads(p.read_text('UTF-8'))
crackertype = CrackerType(**payload)
crackertype.save()
model_class = CrackerType

obj = CrackerType.objects.get(crackerBinaryTypeId=crackertype.id)
self.assertEqual(obj.typeName, payload.get('typeName'))
def create_test_object(self, delete=True):
model_obj = do_create_crackertype()
if delete:
self.delete_after_test(model_obj)
return model_obj

crackertype.delete()
def test_create_crackertype(self):
model_obj = self.create_test_object()
self._test_create(model_obj)

def test_patch_crackertype(self):
p = Path(__file__).parent.joinpath('create_crackertype_001.json')
payload = json.loads(p.read_text('UTF-8'))
crackertype = CrackerType(**payload)
crackertype.save()

stamp = datetime.datetime.now().day
obj_name = f'hashcat{stamp}'
crackertype.typeName = obj_name
crackertype.save()

obj = CrackerType.objects.get(crackerBinaryTypeId=crackertype.id)
self.assertEqual(obj.typeName, obj_name)

crackertype.delete()
model_obj = self.create_test_object()
attr = 'typeName'
new_attr_value = 'Generic - edited'
self._test_patch(model_obj, attr, new_attr_value)

def test_delete_crackertype(self):
p = Path(__file__).parent.joinpath('create_crackertype_001.json')
payload = json.loads(p.read_text('UTF-8'))
crackertype = CrackerType(**payload)
crackertype.save()

id = crackertype.id

crackertype.delete()

objs = CrackerType.objects.filter(crackerBinaryTypeId=id)

self.assertEqual(objs, [])
model_obj = self.create_test_object(delete=False)
self._test_delete(model_obj)

def test_exception_crackertype(self):
p = Path(__file__).parent.joinpath('create_crackertype_002.json')
payload = json.loads(p.read_text('UTF-8'))
crackertype = CrackerType(**payload)

with self.assertRaises(HashtopolisError) as e:
crackertype.save()
self.assertEqual(e.exception.args[1], 'Creation of object failed')
self.assertIn('is not of type string', e.exception.args[4])
self._test_exception(do_create_crackertype, '002')

def test_expandables_crackertype(self):
model_obj = self.create_test_object()
expandables = ['crackerVersions']
self._test_expandables(model_obj, expandables)
Loading

0 comments on commit 95777bb

Please sign in to comment.