The following covers the Unit Testing framework in ACA-Py, how to run the tests, and how to add unit tests.
This video is a presentation of the material covered in this document by developer @shaangill025.
./scripts/run_tests
./scripts/run_tests aries_clouadagent/protocols/out_of_band/v1_0/tests
./scripts/run_tests_indy
includes Indy specific tests
Example: aries_cloudagent/core/tests/test_event_bus.py
@pytest.fixture
def event_bus():
yield EventBus()
@pytest.fixture
def profile():
yield async_mock.MagicMock()
@pytest.fixture
def event():
event = Event(topic="anything", payload="payload")
yield event
class MockProcessor:
def __init__(self):
self.profile = None
self.event = None
async def __call__(self, profile, event):
self.profile = profile
self.event = event
@pytest.fixture
def processor():
yield MockProcessor()
def test_sub_unsub(event_bus: EventBus, processor):
"""Test subscribe and unsubscribe."""
event_bus.subscribe(re.compile(".*"), processor)
assert event_bus.topic_patterns_to_subscribers
assert event_bus.topic_patterns_to_subscribers[re.compile(".*")] == [processor]
event_bus.unsubscribe(re.compile(".*"), processor)
assert not event_bus.topic_patterns_to_subscribers
From aries_cloudagent/core/event_bus.py
class EventBus:
def __init__(self):
self.topic_patterns_to_subscribers: Dict[Pattern, List[Callable]] = {}
def subscribe(self, pattern: Pattern, processor: Callable):
if pattern not in self.topic_patterns_to_subscribers:
self.topic_patterns_to_subscribers[pattern] = []
self.topic_patterns_to_subscribers[pattern].append(processor)
def unsubscribe(self, pattern: Pattern, processor: Callable):
if pattern in self.topic_patterns_to_subscribers:
try:
index = self.topic_patterns_to_subscribers[pattern].index(processor)
except ValueError:
return
del self.topic_patterns_to_subscribers[pattern][index]
if not self.topic_patterns_to_subscribers[pattern]:
del self.topic_patterns_to_subscribers[pattern]
@pytest.mark.asyncio
async def test_sub_notify(event_bus: EventBus, profile, event, processor):
"""Test subscriber receives event."""
event_bus.subscribe(re.compile(".*"), processor)
await event_bus.notify(profile, event)
assert processor.profile == profile
assert processor.event == event
async def notify(self, profile: "Profile", event: Event):
partials = []
for pattern, subscribers in self.topic_patterns_to_subscribers.items():
match = pattern.match(event.topic)
if not match:
continue
for subscriber in subscribers:
partials.append(
partial(
subscriber,
profile,
event.with_metadata(EventMetadata(pattern, match)),
)
)
for processor in partials:
try:
await processor()
except Exception:
LOGGER.exception("Error occurred while processing event")
From: aries_cloudagent/protocols/didexchange/v1_0/tests/test.manager.py
class TestDidExchangeManager(AsyncTestCase, TestConfig):
async def setUp(self):
self.responder = MockResponder()
self.oob_mock = async_mock.MagicMock(
clean_finished_oob_record=async_mock.CoroutineMock(return_value=None)
)
self.route_manager = async_mock.MagicMock(RouteManager)
...
self.profile = InMemoryProfile.test_profile(
{
"default_endpoint": "http://aries.ca/endpoint",
"default_label": "This guy",
"additional_endpoints": ["http://aries.ca/another-endpoint"],
"debug.auto_accept_invites": True,
"debug.auto_accept_requests": True,
"multitenant.enabled": True,
"wallet.id": True,
},
bind={
BaseResponder: self.responder,
OobMessageProcessor: self.oob_mock,
RouteManager: self.route_manager,
...
},
)
...
async def test_receive_invitation_no_auto_accept(self):
async with self.profile.session() as session:
mediation_record = MediationRecord(
role=MediationRecord.ROLE_CLIENT,
state=MediationRecord.STATE_GRANTED,
connection_id=self.test_mediator_conn_id,
routing_keys=self.test_mediator_routing_keys,
endpoint=self.test_mediator_endpoint,
)
await mediation_record.save(session)
with async_mock.patch.object(
self.multitenant_mgr, "get_default_mediator"
) as mock_get_default_mediator:
mock_get_default_mediator.return_value = mediation_record
invi_rec = await self.oob_manager.create_invitation(
my_endpoint="testendpoint",
hs_protos=[HSProto.RFC23],
)
invitee_record = await self.manager.receive_invitation(
invi_rec.invitation,
auto_accept=False,
)
assert invitee_record.state == ConnRecord.State.INVITATION.rfc23
async def receive_invitation(
self,
invitation: OOBInvitationMessage,
their_public_did: Optional[str] = None,
auto_accept: Optional[bool] = None,
alias: Optional[str] = None,
mediation_id: Optional[str] = None,
) -> ConnRecord:
...
accept = (
ConnRecord.ACCEPT_AUTO
if (
auto_accept
or (
auto_accept is None
and self.profile.settings.get("debug.auto_accept_invites")
)
)
else ConnRecord.ACCEPT_MANUAL
)
service_item = invitation.services[0]
# Create connection record
conn_rec = ConnRecord(
invitation_key=(
DIDKey.from_did(service_item.recipient_keys[0]).public_key_b58
if isinstance(service_item, OOBService)
else None
),
invitation_msg_id=invitation._id,
their_label=invitation.label,
their_role=ConnRecord.Role.RESPONDER.rfc23,
state=ConnRecord.State.INVITATION.rfc23,
accept=accept,
alias=alias,
their_public_did=their_public_did,
connection_protocol=DIDX_PROTO,
)
async with self.profile.session() as session:
await conn_rec.save(
session,
reason="Created new connection record from invitation",
log_params={
"invitation": invitation,
"their_role": ConnRecord.Role.RESPONDER.rfc23,
},
)
# Save the invitation for later processing
...
return conn_rec
- Error catching
with self.assertRaises(DIDXManagerError) as ctx:
...
assert " ... error ..." in str(ctx.exception)