diff --git a/filip/clients/ngsi_ld/cb.py b/filip/clients/ngsi_ld/cb.py index fe5d4b38..573a060b 100644 --- a/filip/clients/ngsi_ld/cb.py +++ b/filip/clients/ngsi_ld/cb.py @@ -18,7 +18,7 @@ from filip.models.base import FiwareLDHeader, PaginationMethod, core_context from filip.utils.simple_ql import QueryString from filip.models.ngsi_v2.base import AttrsFormat -from filip.models.ngsi_ld.subscriptions import Subscription +from filip.models.ngsi_ld.subscriptions import SubscriptionLD from filip.models.ngsi_ld.context import ContextLDEntity, ContextLDEntityKeyValues, ContextProperty, ContextRelationship, NamedContextProperty, \ NamedContextRelationship, ActionTypeLD, UpdateLD from filip.models.ngsi_v2.context import Query @@ -518,7 +518,7 @@ def delete_attribute(self, # SUBSCRIPTION API ENDPOINTS def get_subscription_list(self, - limit: PositiveInt = inf) -> List[Subscription]: + limit: PositiveInt = inf) -> List[SubscriptionLD]: """ Returns a list of all the subscriptions present in the system. Args: @@ -538,14 +538,14 @@ def get_subscription_list(self, url=url, params=params, headers=headers) - adapter = TypeAdapter(List[Subscription]) + adapter = TypeAdapter(List[SubscriptionLD]) return adapter.validate_python(items) except requests.RequestException as err: msg = "Could not load subscriptions!" self.log_error(err=err, msg=msg) raise - def post_subscription(self, subscription: Subscription, + def post_subscription(self, subscription: SubscriptionLD, update: bool = False) -> str: """ Creates a new subscription. The subscription is represented by a @@ -577,8 +577,8 @@ def post_subscription(self, subscription: Subscription, subscription.id = ex_sub.id self.update_subscription(subscription) else: - warnings.warn(f"Subscription existed already with the id" - f" {ex_sub.id}") + self.logger.warning(f"Subscription existed already with the id" + f" {ex_sub.id}") return ex_sub.id url = urljoin(self.base_url, f'{self._url_version}/subscriptions') @@ -602,7 +602,7 @@ def post_subscription(self, subscription: Subscription, self.log_error(err=err, msg=msg) raise - def get_subscription(self, subscription_id: str) -> Subscription: + def get_subscription(self, subscription_id: str) -> SubscriptionLD: """ Retrieves a subscription from Args: @@ -617,14 +617,14 @@ def get_subscription(self, subscription_id: str) -> Subscription: res = self.get(url=url, headers=headers) if res.ok: self.logger.debug('Received: %s', res.json()) - return Subscription(**res.json()) + return SubscriptionLD(**res.json()) res.raise_for_status() except requests.RequestException as err: msg = f"Could not load subscription {subscription_id}" self.log_error(err=err, msg=msg) raise - def update_subscription(self, subscription: Subscription) -> None: + def update_subscription(self, subscription: SubscriptionLD) -> None: """ Only the fields included in the request are updated in the subscription. Args: diff --git a/filip/models/__init__.py b/filip/models/__init__.py index e5d180a3..c3505733 100644 --- a/filip/models/__init__.py +++ b/filip/models/__init__.py @@ -1 +1,2 @@ from .base import FiwareHeader +from .base import FiwareLDHeader diff --git a/filip/models/ngsi_ld/subscriptions.py b/filip/models/ngsi_ld/subscriptions.py index da851ec8..1bbf66d1 100644 --- a/filip/models/ngsi_ld/subscriptions.py +++ b/filip/models/ngsi_ld/subscriptions.py @@ -206,7 +206,7 @@ def check_passwords_match(self) -> 'TemporalQuery': return self -class Subscription(BaseModel): +class SubscriptionLD(BaseModel): id: Optional[str] = Field( default=None, description="Subscription identifier (JSON-LD @id)" diff --git a/filip/utils/cleanup.py b/filip/utils/cleanup.py index 6cda8402..e08176f7 100644 --- a/filip/utils/cleanup.py +++ b/filip/utils/cleanup.py @@ -7,11 +7,60 @@ from pydantic import AnyHttpUrl, AnyUrl from requests import RequestException from typing import Callable, List, Union -from filip.models import FiwareHeader +from filip.models import FiwareHeader, FiwareLDHeader from filip.clients.ngsi_v2 import \ ContextBrokerClient, \ IoTAClient, \ QuantumLeapClient +from filip.clients.ngsi_ld.cb import ContextBrokerLDClient +from filip.models.ngsi_ld.context import ActionTypeLD +import logging + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +def clear_context_broker_ld(url: str = None, + fiware_ld_header: FiwareLDHeader = None, + cb_ld_client: ContextBrokerLDClient = None + ): + """ + Function deletes all entities and subscriptions for a tenant in an LD context broker. + + Args: + url: Url of the context broker LD + fiware_ld_header: header of the NGSI-LD tenant + cb_ld_client: NGSI-LD context broker client object + + Returns: + + """ + assert url or cb_ld_client, "Either url or client object must be given" + # create client + if cb_ld_client is None: + client = ContextBrokerLDClient(url=url, fiware_header=fiware_ld_header) + else: + client = cb_ld_client + # clean entities iteratively + try: + entity_list = True + while entity_list: + entity_list = client.get_entity_list(limit=100) + if entity_list: + client.entity_batch_operation(action_type=ActionTypeLD.DELETE, + entities=entity_list) + except RequestException as e: + logger.warning("Could not clean entities completely") + raise + + # clean subscriptions + try: + sub_list = cb_ld_client.get_subscription_list() + for sub in sub_list: + cb_ld_client.delete_subscription(sub.id) + except RequestException as e: + logger.warning("Could not clean subscriptions completely") + raise def clear_context_broker(url: str=None, diff --git a/tests/clients/test_ngsi_ld_cb.py b/tests/clients/test_ngsi_ld_cb.py index 524c6cd6..4363a1ae 100644 --- a/tests/clients/test_ngsi_ld_cb.py +++ b/tests/clients/test_ngsi_ld_cb.py @@ -13,6 +13,7 @@ NamedContextProperty from tests.config import settings import requests +from filip.utils.cleanup import clear_context_broker_ld # Setting up logging @@ -57,28 +58,13 @@ def setUp(self) -> None: self.client = ContextBrokerLDClient(fiware_header=self.fiware_header, session=session, url=settings.LD_CB_URL) - # todo replace with clean up function for ld - try: - entity_list = True - while entity_list: - entity_list = self.client.get_entity_list(limit=100) - self.client.entity_batch_operation(action_type=ActionTypeLD.DELETE, - entities=entity_list) - except RequestException: - pass + clear_context_broker_ld(cb_ld_client=self.client) def tearDown(self) -> None: """ Cleanup test server """ - try: - entity_list = True - while entity_list: - entity_list = self.client.get_entity_list(limit=100) - self.client.entity_batch_operation(action_type=ActionTypeLD.DELETE, - entities=entity_list) - except RequestException: - pass + clear_context_broker_ld(cb_ld_client=self.client) self.client.close() def test_management_endpoints(self): diff --git a/tests/clients/test_ngsi_ld_entity_batch_operation.py b/tests/clients/test_ngsi_ld_entity_batch_operation.py index 14aacc90..da1397c2 100644 --- a/tests/clients/test_ngsi_ld_entity_batch_operation.py +++ b/tests/clients/test_ngsi_ld_entity_batch_operation.py @@ -9,6 +9,7 @@ from filip.clients.ngsi_ld.cb import ContextBrokerLDClient from filip.models.ngsi_ld.context import ContextLDEntity, ActionTypeLD from tests.config import settings +from filip.utils.cleanup import clear_context_broker_ld class EntitiesBatchOperations(unittest.TestCase): @@ -28,28 +29,13 @@ def setUp(self) -> None: self.fiware_header = FiwareLDHeader(ngsild_tenant=settings.FIWARE_SERVICE) self.cb_client = ContextBrokerLDClient(fiware_header=self.fiware_header, url=settings.LD_CB_URL) - # todo replace with clean up function for ld - try: - entity_list = True - while entity_list: - entity_list = self.cb_client.get_entity_list(limit=100) - self.cb_client.entity_batch_operation(action_type=ActionTypeLD.DELETE, - entities=entity_list) - except RequestException: - pass + clear_context_broker_ld(cb_ld_client=self.cb_client) def tearDown(self) -> None: """ Cleanup test server """ - try: - entity_list = True - while entity_list: - entity_list = self.cb_client.get_entity_list(limit=100) - self.cb_client.entity_batch_operation(action_type=ActionTypeLD.DELETE, - entities=entity_list) - except RequestException: - pass + clear_context_broker_ld(cb_ld_client=self.cb_client) self.cb_client.close() def test_entity_batch_operations_create(self) -> None: diff --git a/tests/clients/test_ngsi_ld_subscription.py b/tests/clients/test_ngsi_ld_subscription.py index 0f808a05..c097d1cb 100644 --- a/tests/clients/test_ngsi_ld_subscription.py +++ b/tests/clients/test_ngsi_ld_subscription.py @@ -17,8 +17,9 @@ from filip.models.ngsi_ld.subscriptions import \ Endpoint, \ NotificationParams, \ - Subscription + SubscriptionLD from tests.config import settings +from filip.utils.cleanup import clear_context_broker_ld class TestSubscriptions(TestCase): @@ -35,51 +36,23 @@ def setUp(self) -> None: self.fiware_header = FiwareLDHeader(ngsild_tenant=settings.FIWARE_SERVICE) self.cb_client = ContextBrokerLDClient(fiware_header=self.fiware_header, url=settings.LD_CB_URL) - self.cleanup() + clear_context_broker_ld(cb_ld_client=self.cb_client) - # initial tenant - self.cb_client.post_entity(ContextLDEntity(id="Dummy:1", type="Dummy"), - update=True) - self.cb_client.delete_entity_by_id("Dummy:1") - # self.mqtt_url = "mqtt://test.de:1883" - # self.mqtt_topic = '/filip/testing' - # self.notification = { - # "attributes": ["filling", "controlledAsset"], - # "format": "keyValues", - # "endpoint": { - # "uri": "http://test:1234/subscription/low-stock-farm001-ngsild", - # "accept": "application/json" - # } - # } - self.cb_client = ContextBrokerLDClient() self.mqtt_topic = ''.join([settings.FIWARE_SERVICE, settings.FIWARE_SERVICEPATH]) self.endpoint_mqtt = Endpoint(**{ "uri": str(settings.LD_MQTT_BROKER_URL) + "/my/test/topic", "accept": "application/json", }) - self.cb_client = ContextBrokerLDClient(url=settings.LD_CB_URL, - fiware_header=self.fiware_header) self.endpoint_http = Endpoint(**{ "uri": urllib.parse.urljoin(str(settings.LD_CB_URL), "/ngsi-ld/v1/subscriptions"), "accept": "application/json" - } - ) - self.cleanup() + }) def tearDown(self) -> None: - self.cleanup() + clear_context_broker_ld(cb_ld_client=self.cb_client) self.cb_client.close() - - def cleanup(self): - """ - Cleanup test subscriptions - """ - sub_list = self.cb_client.get_subscription_list() - for sub in sub_list: - if sub.id.startswith('urn:ngsi-ld:Subscription:test_sub'): - self.cb_client.delete_subscription(sub.id) def test_post_subscription_http(self): """ @@ -94,7 +67,7 @@ def test_post_subscription_http(self): attr_id = "attr" id = "urn:ngsi-ld:Subscription:" + "test_sub0" notification_param = NotificationParams(attributes=[attr_id], endpoint=self.endpoint_http) - sub = Subscription(id=id, notification=notification_param, entities=[{"type": "Room"}]) + sub = SubscriptionLD(id=id, notification=notification_param, entities=[{"type": "Room"}]) self.cb_client.post_subscription(sub) sub_list = [x for x in self.cb_client.get_subscription_list() if x.id == 'urn:ngsi-ld:Subscription:test_sub0'] @@ -129,7 +102,7 @@ def test_get_subscription(self): attr_id = "attr" id = "urn:ngsi-ld:Subscription:" + "test_sub0" notification_param = NotificationParams(attributes=[attr_id], endpoint=self.endpoint_http) - sub = Subscription(id=id, notification=notification_param, entities=[{"type": "Room"}]) + sub = SubscriptionLD(id=id, notification=notification_param, entities=[{"type": "Room"}]) self.cb_client.post_subscription(sub) sub_get = self.cb_client.get_subscription(subscription_id=id) self.assertEqual(sub.entities, sub_get.entities) @@ -152,7 +125,7 @@ def test_get_subscription_list(self): attr_id = "attr" + str(i) id = "urn:ngsi-ld:Subscription:" + "test_sub" + str(i) notification_param = NotificationParams(attributes=[attr_id], endpoint=self.endpoint_http) - sub = Subscription(id=id, notification=notification_param, entities=[{"type": "Room"}]) + sub = SubscriptionLD(id=id, notification=notification_param, entities=[{"type": "Room"}]) sub_post_list.append(sub) self.cb_client.post_subscription(sub) @@ -181,10 +154,10 @@ def test_delete_subscription(self): notification_param = NotificationParams( attributes=[attr_id], endpoint=self.endpoint_http) id = "urn:ngsi-ld:Subscription:" + "test_sub" + str(i) - sub = Subscription(id=id, - notification=notification_param, - entities=[{"type": "Room"}] - ) + sub = SubscriptionLD(id=id, + notification=notification_param, + entities=[{"type": "Room"}] + ) if i == 0: del_sub = sub @@ -220,21 +193,21 @@ def test_update_subscription(self): attr_id = "attr" id = "urn:ngsi-ld:Subscription:" + "test_sub77" notification_param = NotificationParams(attributes=[attr_id], endpoint=self.endpoint_http) - sub = Subscription(id=id, notification=notification_param, entities=[{"type": "Room"}]) + sub = SubscriptionLD(id=id, notification=notification_param, entities=[{"type": "Room"}]) self.cb_client.post_subscription(sub) sub_list = self.cb_client.get_subscription_list() self.assertEqual(len(sub_list), 1) print(self.endpoint_http.model_dump()) - sub_changed = Subscription(id=id, notification=notification_param, entities=[{"type": "House"}]) + sub_changed = SubscriptionLD(id=id, notification=notification_param, entities=[{"type": "House"}]) self.cb_client.update_subscription(sub_changed) u_sub= self.cb_client.get_subscription(subscription_id=id) self.assertNotEqual(u_sub,sub_list[0]) self.maxDiff = None self.assertDictEqual(sub_changed.model_dump(), u_sub.model_dump()) - non_sub = Subscription(id="urn:ngsi-ld:Subscription:nonexist", - notification=notification_param, - entities=[{"type":"house"}]) + non_sub = SubscriptionLD(id="urn:ngsi-ld:Subscription:nonexist", + notification=notification_param, + entities=[{"type":"house"}]) with self.assertRaises(Exception): self.cb_client.update_subscription(non_sub) @@ -369,7 +342,7 @@ def on_message(client,userdata,msg): 60) self.mqtt_client.loop_start() #post subscription then start timer - self.cb_client.post_subscription(subscription=Subscription(**self.sub_dict)) + self.cb_client.post_subscription(subscription=SubscriptionLD(**self.sub_dict)) self.timeout_proc.start() #update entity to (ideally) get a notification self.cb_client.update_entity_attribute(entity_id='urn:ngsi-ld:Entity:test_entity03', @@ -425,7 +398,7 @@ def on_message(client,userdata,msg): settings.LD_MQTT_BROKER_URL.port, 60) self.mqtt_client.loop_start() - self.cb_client.post_subscription(subscription=Subscription(**self.sub_dict)) + self.cb_client.post_subscription(subscription=SubscriptionLD(**self.sub_dict)) self.timeout_proc.start() self.cb_client.update_entity_attribute(entity_id='urn:ngsi-ld:Entity:test_entity03', @@ -442,7 +415,7 @@ def on_message(client,userdata,msg): self.timeout_proc = threading.Timer(self.timeout,self.timeout_func) self.sub_dict.update({'q':'temperature>30'}) - self.cb_client.update_subscription(subscription=Subscription(**self.sub_dict)) + self.cb_client.update_subscription(subscription=SubscriptionLD(**self.sub_dict)) time.sleep(5) updated = self.cb_client.get_subscription(self.sub_dict['id']) self.assertEqual(updated.q,'temperature>30') diff --git a/tests/models/test_ngsi_ld_subscriptions.py b/tests/models/test_ngsi_ld_subscriptions.py index aa4d76a0..9f49f1fc 100644 --- a/tests/models/test_ngsi_ld_subscriptions.py +++ b/tests/models/test_ngsi_ld_subscriptions.py @@ -7,7 +7,7 @@ from pydantic import ValidationError from filip.models.ngsi_ld.base import validate_ngsi_ld_query from filip.models.ngsi_ld.subscriptions import \ - Subscription, \ + SubscriptionLD, \ Endpoint, NotificationParams, EntityInfo, TemporalQuery from filip.models.base import FiwareHeader from filip.utils.cleanup import clear_all diff --git a/tests/utils/test_clear.py b/tests/utils/test_clear.py index a57b8686..992ec20d 100644 --- a/tests/utils/test_clear.py +++ b/tests/utils/test_clear.py @@ -4,18 +4,22 @@ import random import time import unittest +import urllib.parse from datetime import datetime from typing import List from uuid import uuid4 - from requests import RequestException - +from filip.clients.ngsi_ld.cb import ContextBrokerLDClient from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient, QuantumLeapClient -from filip.models.base import FiwareHeader +from filip.models.base import FiwareHeader, FiwareLDHeader +from filip.models.ngsi_ld.context import ContextLDEntity, ActionTypeLD +from filip.models.ngsi_ld.subscriptions import SubscriptionLD, NotificationParams, \ + Endpoint from filip.models.ngsi_v2.context import ContextEntity from filip.models.ngsi_v2.iot import Device, ServiceGroup from filip.models.ngsi_v2.subscriptions import Subscription, Message -from filip.utils.cleanup import clear_context_broker, clear_iot_agent, clear_quantumleap +from filip.utils.cleanup import clear_context_broker, clear_iot_agent, clear_quantumleap, \ + clear_context_broker_ld from tests.config import settings @@ -35,6 +39,10 @@ def setUp(self) -> None: self.cb_url = settings.CB_URL self.cb_client = ContextBrokerClient(url=self.cb_url, fiware_header=self.fiware_header) + self.cb_client_ld = ContextBrokerLDClient( + fiware_header=FiwareLDHeader(ngsild_tenant=settings.FIWARE_SERVICE), + url=settings.LD_CB_URL) + self.iota_url = settings.IOTA_URL self.iota_client = IoTAClient(url=self.iota_url, fiware_header=self.fiware_header) @@ -86,6 +94,30 @@ def test_clear_context_broker(self): self.assertEqual(0, len(self.cb_client.get_entity_list()) or len(self.cb_client.get_subscription_list())) + def test_clear_context_broker_ld(self): + """ + Test for clearing context broker LD using context broker client + """ + random_list = [random.randint(0, 100) for _ in range(10)] + entities = [ContextLDEntity(id=f"urn:ngsi-ld:clear_test:{str(i)}", + type='clear_test') for i in random_list] + self.cb_client_ld.entity_batch_operation(action_type=ActionTypeLD.CREATE, + entities=entities) + notification_param = NotificationParams(attributes=["attr"], + endpoint=Endpoint(**{ + "uri": urllib.parse.urljoin( + str(settings.LD_CB_URL), + "/ngsi-ld/v1/subscriptions"), + "accept": "application/json" + })) + sub = SubscriptionLD(id=f"urn:ngsi-ld:Subscription:clear_test:{random.randint(0, 100)}", + notification=notification_param, + entities=[{"type": "clear_test"}]) + self.cb_client_ld.post_subscription(subscription=sub) + clear_context_broker_ld(cb_ld_client=self.cb_client_ld) + self.assertEqual(0, len(self.cb_client_ld.get_entity_list())) + self.assertEqual(0, len(self.cb_client_ld.get_subscription_list())) + def test_clear_context_broker_with_url(self): """ Test for clearing context broker using context broker url and fiware header as parameters