Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

262 support entity creation with keyvalues #264

Merged
merged 11 commits into from
Apr 24, 2024
150 changes: 150 additions & 0 deletions examples/ngsi_v2/e12_ngsi_v2_use_case_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""
# This example shows a workflow, how you can define or reuse use case specific data
# models and ensure FIWARE compatibility by merging these models with existing data
# model in FiLiP. The merged models can be used for interaction with FIWARE platform
# and in other information processing systems to establish interoperability.

# In short: this workflow shows you a way to keep use case model simple and
# reusable while ensuring the compatability with FIWARE NGSI-V2 standards
"""
from pydantic import ConfigDict, BaseModel
from pydantic.fields import Field, FieldInfo
from filip.models import FiwareHeader
from filip.models.ngsi_v2.context import ContextEntityKeyValues
from filip.clients.ngsi_v2.cb import ContextBrokerClient
from pprint import pprint

# Host address of Context Broker
CB_URL = "http://localhost:1026"

# You can here also change the used Fiware service
# FIWARE-Service
SERVICE = 'filip'
# FIWARE-Servicepath
SERVICE_PATH = '/'
fiware_header = FiwareHeader(service=SERVICE,
service_path=SERVICE_PATH)


# Reuse existing data model from the internet
class PostalAddress(BaseModel):
"""
https://schema.org/PostalAddress
"""

model_config = ConfigDict(populate_by_name=True, coerce_numbers_to_str=True)

address_country: str = Field(
alias="addressCountry",
description="County code according to ISO 3166-1-alpha-2",
)
street_address: str = Field(
alias="streetAddress",
description="The street address. For example, 1600 Amphitheatre Pkwy.",
)
address_region: str = Field(
alias="addressRegion",
default=None,
)
address_locality: str = Field(
alias="addressLocality",
default=None,
description="The locality in which the street address is, and which is "
"in the region. For example, Mountain View.",
)
postal_code: str = Field(
alias="postalCode",
default=None,
description="The postal code. For example, 94043.",
)


# It is assumed that this kind of models exists in use case, which is simple and use case
# specific. It describes basically, how does the data look like in the specific use case.
class WeatherStation(BaseModel):
model_config = ConfigDict(coerce_numbers_to_str=True, extra="ignore")
temperature: float = Field(default=20.0)
humidity: float = Field(default=50.0)
pressure: float = Field(default=1.0)
address: PostalAddress


# Merge the use case model with the FIWARE simplified data model to ensure FIWARE
# compatibility.
class WeatherStationFIWARE(WeatherStation, ContextEntityKeyValues):
# add default for type if not explicitly set
type: str = FieldInfo.merge_field_infos(
# First position is the field info of the parent class
ContextEntityKeyValues.model_fields["type"],
# set the default value
default="CustomModels:WeatherStation",
# overwrite the title in the json-schema if necessary
title="Type of the Weather Station",
# overwrite the description
description="Type of the Weather Station",
# validate the default value if necessary
validate_default=True,
# freeze the field if necessary
frozen=True,
# for more options see the pydantic documentation
)


if __name__ == "__main__":
# Now we can export both the use case model and the FIWARE specific
# models to json-schema files and share it with other stakeholders
# that need the data.
use_case_model = WeatherStation.model_json_schema()
pprint(use_case_model)

fiware_specific_model = WeatherStationFIWARE.model_json_schema()
pprint(fiware_specific_model)

# Workflow to utilize these data models.

# 0. Initial client
cb_client = ContextBrokerClient(url=CB_URL,
fiware_header=fiware_header)

# 1. Crate data
weather_station = WeatherStationFIWARE(
id="myWeatherStation",
type="WeatherStation",
temperature=20,
address={
"address_country": "Germany",
"street_address": "Mathieustr. 10",
"postal_code": 52072,
},
)
cb_client.post_entity(entity=weather_station, key_values=True,
update=True)

# 2. Update data
weather_station.temperature = 30 # represent use case algorithm
cb_client.update_entity_key_values(entity=weather_station)

# 3. Query and validate data
# represent querying data by data users
weather_station_data = cb_client.get_entity(entity_id="myWeatherStation",
response_format="keyValues")
## validate with general model
weather_station_2_general = WeatherStation.model_validate(
weather_station_data.model_dump()
)
## validate with fiware specific model
weather_station_2_fiware = WeatherStationFIWARE.model_validate(
weather_station_data.model_dump()
)

# 4. Use data for different purposes
# for use case specific usage
print("Data complied with general model can be forwarded to other platform/system:\n"
f"{weather_station_2_general.model_dump_json(indent=2)}")
print(f"For example, address still comply with existing model:\n"
f"{weather_station_2_general.address.model_dump_json(indent=2)}\n")

# for fiware specific usage
print("For usage within FIWARE system, id and type is helpful, e.g. for creating"
"notification for entity:\n"
f"{weather_station_2_fiware.model_dump_json(indent=2, include={'id', 'type'})}\n")
RCX112 marked this conversation as resolved.
Show resolved Hide resolved
111 changes: 73 additions & 38 deletions filip/clients/ngsi_v2/cb.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,11 @@ def get_statistics(self) -> Dict:
# Entity Operations
def post_entity(
self,
entity: ContextEntity,
entity: Union[ContextEntity, ContextEntityKeyValues],
update: bool = False,
patch: bool = False,
override_attr_metadata: bool = True,
key_values: bool = False,
):
"""
Function registers an Object with the NGSI Context Broker,
Expand All @@ -215,7 +216,7 @@ def post_entity(
patch argument.

Args:
entity (ContextEntity):
entity (ContextEntity/ContextEntityKeyValues):
Context Entity Object
update (bool):
If the response.status_code is 422, whether the override and
Expand All @@ -227,13 +228,27 @@ def post_entity(
Only applies for patch equal to `True`.
Whether to override or append the attribute's metadata.
`True` for overwrite or `False` for update/append

key_values(bool):
By default False. If set to True, "options=keyValues" will
be included in params of post request. The payload uses
the keyValues simplified entity representation, i.e.
ContextEntityKeyValues.
"""
url = urljoin(self.base_url, "v2/entities")
headers = self.headers.copy()
params = {}
options = []
if key_values:
assert isinstance(entity, ContextEntityKeyValues)
options.append("keyValues")
else:
assert isinstance(entity, ContextEntity)
if options:
params.update({'options': ",".join(options)})
try:
res = self.post(
url=url, headers=headers, json=entity.model_dump(exclude_none=True)
url=url, headers=headers, json=entity.model_dump(exclude_none=True),
params=params,
)
if res.ok:
self.logger.info("Entity successfully posted!")
Expand All @@ -242,11 +257,14 @@ def post_entity(
except requests.RequestException as err:
if update and err.response.status_code == 422:
return self.override_entity(
entity=entity)
entity=entity, key_values=key_values)
if patch and err.response.status_code == 422:
return self.patch_entity(
entity=entity, override_attr_metadata=override_attr_metadata
)
if not key_values:
return self.patch_entity(
entity=entity, override_attr_metadata=override_attr_metadata
)
else:
return self.update_entity_key_values(entity=entity)
msg = f"Could not post entity {entity.id}"
self.log_error(err=err, msg=msg)
raise
Expand Down Expand Up @@ -740,12 +758,12 @@ def update_or_append_entity_attributes(
self.log_error(err=err, msg=msg)
raise

def update_entity_key_value(self,
entity: Union[ContextEntityKeyValues, dict],):
def update_entity_key_values(self,
entity: Union[ContextEntityKeyValues, dict],):
"""
The entity are updated with a ContextEntityKeyValues object or a
dictionary contain the simplified entity data. This corresponds to a
'PATcH' request.
'PATCH' request.
Only existing attribute can be updated!

Args:
Expand Down Expand Up @@ -777,11 +795,11 @@ def update_entity_key_value(self,
self.log_error(err=err, msg=msg)
raise

def update_entity_attributes_key_value(self,
entity_id: str,
attrs: dict,
entity_type: str = None,
):
def update_entity_attributes_key_values(self,
entity_id: str,
attrs: dict,
entity_type: str = None,
):
"""
Update entity with attributes in keyValues form.
This corresponds to a 'PATcH' request.
Expand Down Expand Up @@ -812,7 +830,7 @@ def update_entity_attributes_key_value(self,
"type": entity_type
})
entity = ContextEntityKeyValues(**entity_dict)
self.update_entity_key_value(entity=entity)
self.update_entity_key_values(entity=entity)

def update_existing_entity_attributes(
self,
Expand Down Expand Up @@ -879,7 +897,10 @@ def update_existing_entity_attributes(
self.log_error(err=err, msg=msg)
raise

def override_entity(self, entity: ContextEntity):
def override_entity(self,
entity: Union[ContextEntity, ContextEntityKeyValues],
**kwargs
):
"""
The request payload is an object representing the attributes to
override the existing entity.
Expand All @@ -888,21 +909,25 @@ def override_entity(self, entity: ContextEntity):
If you want to manipulate you should rather use patch_entity.

Args:
entity (ContextEntity):
entity (ContextEntity or ContextEntityKeyValues):
Returns:
None
"""
self.replace_entity_attributes(entity_id=entity.id,
entity_type=entity.type,
attrs=entity.get_properties())
return self.replace_entity_attributes(entity_id=entity.id,
entity_type=entity.type,
attrs=entity.get_attributes(),
**kwargs
)

def replace_entity_attributes(
self,
entity_id: str,
entity_type: str,
attrs: List[Union[NamedContextAttribute,
attrs: Union[List[Union[NamedContextAttribute,
Dict[str, ContextAttribute]]],
forcedUpdate: bool = False
Dict],
forcedUpdate: bool = False,
key_values: bool = False,
):
"""
The attributes previously existing in the entity are removed and
Expand All @@ -913,11 +938,17 @@ def replace_entity_attributes(
entity_id: Entity id to be updated
entity_type: Entity type, to avoid ambiguity in case there are
several entities with the same entity id.
attrs: List of attributes to add to the entity
attrs: List of attributes to add to the entity or dict of
attributes in case of key_values=True.
forcedUpdate: Update operation have to trigger any matching
subscription, no matter if there is an actual attribute
update or no instead of the default behavior, which is to
updated only if attribute is effectively updated.
key_values(bool):
By default False. If set to True, "options=keyValues" will
be included in params of the request. The payload uses
the keyValues simplified entity representation, i.e.
ContextEntityKeyValues.
Returns:
None
"""
Expand All @@ -927,30 +958,34 @@ def replace_entity_attributes(
options = []
if forcedUpdate:
options.append("forcedUpdate")
if key_values:
options.append("keyValues")
assert isinstance(attrs, dict)
else:
entity = ContextEntity(id=entity_id, type=entity_type)
entity.add_attributes(attrs)
attrs = entity.model_dump(
exclude={"id", "type"},
exclude_none=True
)
if options:
params.update({'options': ",".join(options)})
if entity_type:
params.update({"type": entity_type})

entity = ContextEntity(id=entity_id, type=entity_type)
entity.add_attributes(attrs)

try:
res = self.put(
url=url,
headers=headers,
json=entity.model_dump(
exclude={"id", "type"},
exclude_none=True
),
json=attrs,
params=params,
)
if res.ok:
self.logger.info("Entity '%s' successfully " "updated!", entity.id)
self.logger.info("Entity '%s' successfully " "updated!", entity_id)
else:
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not replace attribute of entity {entity.id} !"
msg = f"Could not replace attribute of entity {entity_id} !"
self.log_error(err=err, msg=msg)
raise

Expand Down Expand Up @@ -1619,7 +1654,7 @@ def delete_registration(self, registration_id: str) -> None:
# Batch operation API
def update(self,
*,
entities: List[ContextEntity],
entities: List[Union[ContextEntity, ContextEntityKeyValues]],
action_type: Union[ActionType, str],
update_format: str = None,
forcedUpdate: bool = False,
Expand Down Expand Up @@ -1676,13 +1711,13 @@ def update(self,
options.append("overrideMetadata")
if forcedUpdate:
options.append("forcedUpdate")
if options:
params.update({'options': ",".join(options)})
if update_format:
assert (
update_format == "keyValues"
), "Only 'keyValues' is allowed as update format"
params.update({"options": "keyValues"})
options.append("keyValues")
if options:
params.update({'options': ",".join(options)})
update = Update(actionType=action_type, entities=entities)
try:
res = self.post(
Expand Down
Loading