Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

342 clean up functions for ngsi ld context broker #348

Merged
merged 4 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions filip/clients/ngsi_ld/cb.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from filip.models.base import FiwareLDHeader, PaginationMethod, core_context
from filip.utils.simple_ql import QueryString
from filip.models.ngsi_v2.base import AttrsFormat
from filip.models.ngsi_ld.subscriptions import Subscription
from filip.models.ngsi_ld.subscriptions import SubscriptionLD
from filip.models.ngsi_ld.context import ContextLDEntity, ContextLDEntityKeyValues, ContextProperty, ContextRelationship, NamedContextProperty, \
NamedContextRelationship, ActionTypeLD, UpdateLD
from filip.models.ngsi_v2.context import Query
Expand Down Expand Up @@ -518,7 +518,7 @@ def delete_attribute(self,

# SUBSCRIPTION API ENDPOINTS
def get_subscription_list(self,
limit: PositiveInt = inf) -> List[Subscription]:
limit: PositiveInt = inf) -> List[SubscriptionLD]:
"""
Returns a list of all the subscriptions present in the system.
Args:
Expand All @@ -538,14 +538,14 @@ def get_subscription_list(self,
url=url,
params=params,
headers=headers)
adapter = TypeAdapter(List[Subscription])
adapter = TypeAdapter(List[SubscriptionLD])
return adapter.validate_python(items)
except requests.RequestException as err:
msg = "Could not load subscriptions!"
self.log_error(err=err, msg=msg)
raise

def post_subscription(self, subscription: Subscription,
def post_subscription(self, subscription: SubscriptionLD,
update: bool = False) -> str:
"""
Creates a new subscription. The subscription is represented by a
Expand Down Expand Up @@ -577,8 +577,8 @@ def post_subscription(self, subscription: Subscription,
subscription.id = ex_sub.id
self.update_subscription(subscription)
else:
warnings.warn(f"Subscription existed already with the id"
f" {ex_sub.id}")
self.logger.warning(f"Subscription existed already with the id"
f" {ex_sub.id}")
return ex_sub.id

url = urljoin(self.base_url, f'{self._url_version}/subscriptions')
Expand All @@ -602,7 +602,7 @@ def post_subscription(self, subscription: Subscription,
self.log_error(err=err, msg=msg)
raise

def get_subscription(self, subscription_id: str) -> Subscription:
def get_subscription(self, subscription_id: str) -> SubscriptionLD:
"""
Retrieves a subscription from
Args:
Expand All @@ -617,14 +617,14 @@ def get_subscription(self, subscription_id: str) -> Subscription:
res = self.get(url=url, headers=headers)
if res.ok:
self.logger.debug('Received: %s', res.json())
return Subscription(**res.json())
return SubscriptionLD(**res.json())
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not load subscription {subscription_id}"
self.log_error(err=err, msg=msg)
raise

def update_subscription(self, subscription: Subscription) -> None:
def update_subscription(self, subscription: SubscriptionLD) -> None:
"""
Only the fields included in the request are updated in the subscription.
Args:
Expand Down
1 change: 1 addition & 0 deletions filip/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .base import FiwareHeader
from .base import FiwareLDHeader
2 changes: 1 addition & 1 deletion filip/models/ngsi_ld/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def check_passwords_match(self) -> 'TemporalQuery':
return self


class Subscription(BaseModel):
class SubscriptionLD(BaseModel):
id: Optional[str] = Field(
default=None,
description="Subscription identifier (JSON-LD @id)"
Expand Down
51 changes: 50 additions & 1 deletion filip/utils/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,60 @@
from pydantic import AnyHttpUrl, AnyUrl
from requests import RequestException
from typing import Callable, List, Union
from filip.models import FiwareHeader
from filip.models import FiwareHeader, FiwareLDHeader
from filip.clients.ngsi_v2 import \
ContextBrokerClient, \
IoTAClient, \
QuantumLeapClient
from filip.clients.ngsi_ld.cb import ContextBrokerLDClient
from filip.models.ngsi_ld.context import ActionTypeLD
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


def clear_context_broker_ld(url: str = None,
fiware_ld_header: FiwareLDHeader = None,
cb_ld_client: ContextBrokerLDClient = None
):
"""
Function deletes all entities and subscriptions for a tenant in an LD context broker.

Args:
url: Url of the context broker LD
fiware_ld_header: header of the NGSI-LD tenant
cb_ld_client: NGSI-LD context broker client object

Returns:

"""
assert url or cb_ld_client, "Either url or client object must be given"
# create client
if cb_ld_client is None:
client = ContextBrokerLDClient(url=url, fiware_header=fiware_ld_header)
else:
client = cb_ld_client
# clean entities iteratively
try:
entity_list = True
while entity_list:
entity_list = client.get_entity_list(limit=100)
if entity_list:
client.entity_batch_operation(action_type=ActionTypeLD.DELETE,
entities=entity_list)
except RequestException as e:
logger.warning("Could not clean entities completely")
raise

# clean subscriptions
try:
sub_list = cb_ld_client.get_subscription_list()
for sub in sub_list:
cb_ld_client.delete_subscription(sub.id)
except RequestException as e:
logger.warning("Could not clean subscriptions completely")
raise


def clear_context_broker(url: str=None,
Expand Down
20 changes: 3 additions & 17 deletions tests/clients/test_ngsi_ld_cb.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
NamedContextProperty
from tests.config import settings
import requests
from filip.utils.cleanup import clear_context_broker_ld


# Setting up logging
Expand Down Expand Up @@ -57,28 +58,13 @@ def setUp(self) -> None:
self.client = ContextBrokerLDClient(fiware_header=self.fiware_header,
session=session,
url=settings.LD_CB_URL)
# todo replace with clean up function for ld
try:
entity_list = True
while entity_list:
entity_list = self.client.get_entity_list(limit=100)
self.client.entity_batch_operation(action_type=ActionTypeLD.DELETE,
entities=entity_list)
except RequestException:
pass
clear_context_broker_ld(cb_ld_client=self.client)

def tearDown(self) -> None:
"""
Cleanup test server
"""
try:
entity_list = True
while entity_list:
entity_list = self.client.get_entity_list(limit=100)
self.client.entity_batch_operation(action_type=ActionTypeLD.DELETE,
entities=entity_list)
except RequestException:
pass
clear_context_broker_ld(cb_ld_client=self.client)
self.client.close()

def test_management_endpoints(self):
Expand Down
20 changes: 3 additions & 17 deletions tests/clients/test_ngsi_ld_entity_batch_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from filip.clients.ngsi_ld.cb import ContextBrokerLDClient
from filip.models.ngsi_ld.context import ContextLDEntity, ActionTypeLD
from tests.config import settings
from filip.utils.cleanup import clear_context_broker_ld


class EntitiesBatchOperations(unittest.TestCase):
Expand All @@ -28,28 +29,13 @@ def setUp(self) -> None:
self.fiware_header = FiwareLDHeader(ngsild_tenant=settings.FIWARE_SERVICE)
self.cb_client = ContextBrokerLDClient(fiware_header=self.fiware_header,
url=settings.LD_CB_URL)
# todo replace with clean up function for ld
try:
entity_list = True
while entity_list:
entity_list = self.cb_client.get_entity_list(limit=100)
self.cb_client.entity_batch_operation(action_type=ActionTypeLD.DELETE,
entities=entity_list)
except RequestException:
pass
clear_context_broker_ld(cb_ld_client=self.cb_client)

def tearDown(self) -> None:
"""
Cleanup test server
"""
try:
entity_list = True
while entity_list:
entity_list = self.cb_client.get_entity_list(limit=100)
self.cb_client.entity_batch_operation(action_type=ActionTypeLD.DELETE,
entities=entity_list)
except RequestException:
pass
clear_context_broker_ld(cb_ld_client=self.cb_client)
self.cb_client.close()

def test_entity_batch_operations_create(self) -> None:
Expand Down
67 changes: 20 additions & 47 deletions tests/clients/test_ngsi_ld_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from filip.models.ngsi_ld.subscriptions import \
Endpoint, \
NotificationParams, \
Subscription
SubscriptionLD
from tests.config import settings
from filip.utils.cleanup import clear_context_broker_ld


class TestSubscriptions(TestCase):
Expand All @@ -35,51 +36,23 @@ def setUp(self) -> None:
self.fiware_header = FiwareLDHeader(ngsild_tenant=settings.FIWARE_SERVICE)
self.cb_client = ContextBrokerLDClient(fiware_header=self.fiware_header,
url=settings.LD_CB_URL)
self.cleanup()
clear_context_broker_ld(cb_ld_client=self.cb_client)

# initial tenant
self.cb_client.post_entity(ContextLDEntity(id="Dummy:1", type="Dummy"),
update=True)
self.cb_client.delete_entity_by_id("Dummy:1")
# self.mqtt_url = "mqtt://test.de:1883"
# self.mqtt_topic = '/filip/testing'
# self.notification = {
# "attributes": ["filling", "controlledAsset"],
# "format": "keyValues",
# "endpoint": {
# "uri": "http://test:1234/subscription/low-stock-farm001-ngsild",
# "accept": "application/json"
# }
# }
self.cb_client = ContextBrokerLDClient()
self.mqtt_topic = ''.join([settings.FIWARE_SERVICE,
settings.FIWARE_SERVICEPATH])
self.endpoint_mqtt = Endpoint(**{
"uri": str(settings.LD_MQTT_BROKER_URL) + "/my/test/topic",
"accept": "application/json",
})
self.cb_client = ContextBrokerLDClient(url=settings.LD_CB_URL,
fiware_header=self.fiware_header)
self.endpoint_http = Endpoint(**{
"uri": urllib.parse.urljoin(str(settings.LD_CB_URL),
"/ngsi-ld/v1/subscriptions"),
"accept": "application/json"
}
)
self.cleanup()
})

def tearDown(self) -> None:
self.cleanup()
clear_context_broker_ld(cb_ld_client=self.cb_client)
self.cb_client.close()

def cleanup(self):
"""
Cleanup test subscriptions
"""
sub_list = self.cb_client.get_subscription_list()
for sub in sub_list:
if sub.id.startswith('urn:ngsi-ld:Subscription:test_sub'):
self.cb_client.delete_subscription(sub.id)

def test_post_subscription_http(self):
"""
Expand All @@ -94,7 +67,7 @@ def test_post_subscription_http(self):
attr_id = "attr"
id = "urn:ngsi-ld:Subscription:" + "test_sub0"
notification_param = NotificationParams(attributes=[attr_id], endpoint=self.endpoint_http)
sub = Subscription(id=id, notification=notification_param, entities=[{"type": "Room"}])
sub = SubscriptionLD(id=id, notification=notification_param, entities=[{"type": "Room"}])
self.cb_client.post_subscription(sub)
sub_list = [x for x in self.cb_client.get_subscription_list()
if x.id == 'urn:ngsi-ld:Subscription:test_sub0']
Expand Down Expand Up @@ -129,7 +102,7 @@ def test_get_subscription(self):
attr_id = "attr"
id = "urn:ngsi-ld:Subscription:" + "test_sub0"
notification_param = NotificationParams(attributes=[attr_id], endpoint=self.endpoint_http)
sub = Subscription(id=id, notification=notification_param, entities=[{"type": "Room"}])
sub = SubscriptionLD(id=id, notification=notification_param, entities=[{"type": "Room"}])
self.cb_client.post_subscription(sub)
sub_get = self.cb_client.get_subscription(subscription_id=id)
self.assertEqual(sub.entities, sub_get.entities)
Expand All @@ -152,7 +125,7 @@ def test_get_subscription_list(self):
attr_id = "attr" + str(i)
id = "urn:ngsi-ld:Subscription:" + "test_sub" + str(i)
notification_param = NotificationParams(attributes=[attr_id], endpoint=self.endpoint_http)
sub = Subscription(id=id, notification=notification_param, entities=[{"type": "Room"}])
sub = SubscriptionLD(id=id, notification=notification_param, entities=[{"type": "Room"}])
sub_post_list.append(sub)
self.cb_client.post_subscription(sub)

Expand Down Expand Up @@ -181,10 +154,10 @@ def test_delete_subscription(self):
notification_param = NotificationParams(
attributes=[attr_id], endpoint=self.endpoint_http)
id = "urn:ngsi-ld:Subscription:" + "test_sub" + str(i)
sub = Subscription(id=id,
notification=notification_param,
entities=[{"type": "Room"}]
)
sub = SubscriptionLD(id=id,
notification=notification_param,
entities=[{"type": "Room"}]
)

if i == 0:
del_sub = sub
Expand Down Expand Up @@ -220,21 +193,21 @@ def test_update_subscription(self):
attr_id = "attr"
id = "urn:ngsi-ld:Subscription:" + "test_sub77"
notification_param = NotificationParams(attributes=[attr_id], endpoint=self.endpoint_http)
sub = Subscription(id=id, notification=notification_param, entities=[{"type": "Room"}])
sub = SubscriptionLD(id=id, notification=notification_param, entities=[{"type": "Room"}])
self.cb_client.post_subscription(sub)
sub_list = self.cb_client.get_subscription_list()
self.assertEqual(len(sub_list), 1)
print(self.endpoint_http.model_dump())
sub_changed = Subscription(id=id, notification=notification_param, entities=[{"type": "House"}])
sub_changed = SubscriptionLD(id=id, notification=notification_param, entities=[{"type": "House"}])
self.cb_client.update_subscription(sub_changed)
u_sub= self.cb_client.get_subscription(subscription_id=id)
self.assertNotEqual(u_sub,sub_list[0])
self.maxDiff = None
self.assertDictEqual(sub_changed.model_dump(),
u_sub.model_dump())
non_sub = Subscription(id="urn:ngsi-ld:Subscription:nonexist",
notification=notification_param,
entities=[{"type":"house"}])
non_sub = SubscriptionLD(id="urn:ngsi-ld:Subscription:nonexist",
notification=notification_param,
entities=[{"type":"house"}])
with self.assertRaises(Exception):
self.cb_client.update_subscription(non_sub)

Expand Down Expand Up @@ -369,7 +342,7 @@ def on_message(client,userdata,msg):
60)
self.mqtt_client.loop_start()
#post subscription then start timer
self.cb_client.post_subscription(subscription=Subscription(**self.sub_dict))
self.cb_client.post_subscription(subscription=SubscriptionLD(**self.sub_dict))
self.timeout_proc.start()
#update entity to (ideally) get a notification
self.cb_client.update_entity_attribute(entity_id='urn:ngsi-ld:Entity:test_entity03',
Expand Down Expand Up @@ -425,7 +398,7 @@ def on_message(client,userdata,msg):
settings.LD_MQTT_BROKER_URL.port,
60)
self.mqtt_client.loop_start()
self.cb_client.post_subscription(subscription=Subscription(**self.sub_dict))
self.cb_client.post_subscription(subscription=SubscriptionLD(**self.sub_dict))
self.timeout_proc.start()

self.cb_client.update_entity_attribute(entity_id='urn:ngsi-ld:Entity:test_entity03',
Expand All @@ -442,7 +415,7 @@ def on_message(client,userdata,msg):
self.timeout_proc = threading.Timer(self.timeout,self.timeout_func)

self.sub_dict.update({'q':'temperature>30'})
self.cb_client.update_subscription(subscription=Subscription(**self.sub_dict))
self.cb_client.update_subscription(subscription=SubscriptionLD(**self.sub_dict))
time.sleep(5)
updated = self.cb_client.get_subscription(self.sub_dict['id'])
self.assertEqual(updated.q,'temperature>30')
Expand Down
2 changes: 1 addition & 1 deletion tests/models/test_ngsi_ld_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pydantic import ValidationError
from filip.models.ngsi_ld.base import validate_ngsi_ld_query
from filip.models.ngsi_ld.subscriptions import \
Subscription, \
SubscriptionLD, \
Endpoint, NotificationParams, EntityInfo, TemporalQuery
from filip.models.base import FiwareHeader
from filip.utils.cleanup import clear_all
Expand Down
Loading