diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8c2296054..059c326f2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,7 +4,7 @@ repos: hooks: - id: black language_version: python3 -- repo: https://gitlab.com/pycqa/flake8 +- repo: https://github.com/pycqa/flake8 rev: 3.9.1 # Use the ref you want to point at hooks: - id: flake8 diff --git a/README.md b/README.md index 5aedbe2da..5fc1ba754 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ The Azure IoT Device library is available on PyPI: pip install azure-iot-device ``` -Python 3.6 or higher is required in order to use the library +Python 3.7 or higher is required in order to use the library ## Using the library API documentation for this package is available via [**Microsoft Docs**](https://docs.microsoft.com/python/api/azure-iot-device/azure.iot.device?view=azure-python). diff --git a/azure-iot-device/azure/iot/device/common/http_transport.py b/azure-iot-device/azure/iot/device/common/http_transport.py index ab1cbfbdc..d09629f0e 100644 --- a/azure-iot-device/azure/iot/device/common/http_transport.py +++ b/azure-iot-device/azure/iot/device/common/http_transport.py @@ -6,7 +6,7 @@ import logging import ssl -import requests +import requests # type: ignore from . import transport_exceptions as exceptions from .pipeline import pipeline_thread diff --git a/azure-iot-device/azure/iot/device/custom_typing.py b/azure-iot-device/azure/iot/device/custom_typing.py new file mode 100644 index 000000000..2c1508fd4 --- /dev/null +++ b/azure-iot-device/azure/iot/device/custom_typing.py @@ -0,0 +1,41 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +from typing import Any, Union, Dict, List, Tuple, Callable, Awaitable, TypeVar +from typing_extensions import TypedDict, ParamSpec + + +_P = ParamSpec("_P") +_R = TypeVar("_R") +FunctionOrCoroutine = Union[Callable[_P, _R], Callable[_P, Awaitable[_R]]] + + +# typing does not support recursion, so we must use forward references here (PEP484) +JSONSerializable = Union[ + Dict[str, "JSONSerializable"], + List["JSONSerializable"], + Tuple["JSONSerializable", ...], + str, + int, + float, + bool, + None, +] +# TODO: verify that the JSON specification requires str as keys in dict. Not sure why that's defined here. + + +Twin = Dict[str, Dict[str, JSONSerializable]] +TwinPatch = Dict[str, JSONSerializable] + + +class StorageInfo(TypedDict): + correlationId: str + hostName: str + containerName: str + blobName: str + sasToken: str + + +ProvisioningPayload = Union[Dict[str, Any], str, int] diff --git a/azure-iot-device/azure/iot/device/iothub/abstract_clients.py b/azure-iot-device/azure/iot/device/iothub/abstract_clients.py index ed712e298..0cd5a8137 100644 --- a/azure-iot-device/azure/iot/device/iothub/abstract_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/abstract_clients.py @@ -5,7 +5,7 @@ # -------------------------------------------------------------------------- """This module contains abstract classes for the various clients of the Azure IoT Hub Device SDK """ - +from __future__ import annotations # Needed for annotation bug < 3.10 import abc import logging import threading @@ -17,14 +17,20 @@ from azure.iot.device.common.auth import connection_string as cs from azure.iot.device.common.auth import sastoken as st from azure.iot.device.iothub import client_event +from azure.iot.device.iothub.models import Message, MethodRequest, MethodResponse +from azure.iot.device.common.models import X509 from azure.iot.device import exceptions from azure.iot.device.common import auth, handle_exceptions from . import edge_hsm +from .pipeline import MQTTPipeline, HTTPPipeline +from typing_extensions import Self +from azure.iot.device.custom_typing import FunctionOrCoroutine, Twin, TwinPatch +from typing import Any, Dict, List, Optional, Union logger = logging.getLogger(__name__) -def _validate_kwargs(exclude=[], **kwargs): +def _validate_kwargs(exclude: Optional[List[str]] = [], **kwargs) -> None: """Helper function to validate user provided kwargs. Raises TypeError if an invalid option has been provided""" valid_kwargs = [ @@ -43,11 +49,11 @@ def _validate_kwargs(exclude=[], **kwargs): ] for kwarg in kwargs: - if (kwarg not in valid_kwargs) or (kwarg in exclude): + if (kwarg not in valid_kwargs) or (exclude is not None and kwarg in exclude): raise TypeError("Unsupported keyword argument: '{}'".format(kwarg)) -def _get_config_kwargs(**kwargs): +def _get_config_kwargs(**kwargs) -> Dict[str, Any]: """Get the subset of kwargs which pertain the config object""" valid_config_kwargs = [ "server_verification_cert", @@ -70,7 +76,7 @@ def _get_config_kwargs(**kwargs): return config_kwargs -def _form_sas_uri(hostname, device_id, module_id=None): +def _form_sas_uri(hostname: str, device_id: str, module_id: Optional[str] = None) -> str: if module_id: return "{hostname}/devices/{device_id}/modules/{module_id}".format( hostname=hostname, device_id=device_id, module_id=module_id @@ -79,7 +85,7 @@ def _form_sas_uri(hostname, device_id, module_id=None): return "{hostname}/devices/{device_id}".format(hostname=hostname, device_id=device_id) -def _extract_sas_uri_values(uri): +def _extract_sas_uri_values(uri: str) -> Dict[str, Any]: d = {} items = uri.split("/") if len(items) != 3 and len(items) != 5: @@ -93,7 +99,7 @@ def _extract_sas_uri_values(uri): try: d["module_id"] = items[4] except IndexError: - d["module_id"] = None + d["module_id"] = "" return d @@ -108,7 +114,7 @@ class AbstractIoTHubClient(abc.ABC): This class needs to be extended for specific clients. """ - def __init__(self, mqtt_pipeline, http_pipeline): + def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline) -> None: """Initializer for a generic client. :param mqtt_pipeline: The pipeline used to connect to the IoTHub endpoint. @@ -122,49 +128,53 @@ def __init__(self, mqtt_pipeline, http_pipeline): self._receive_type = RECEIVE_TYPE_NONE_SET self._client_lock = threading.Lock() - def _on_connected(self): + def _on_connected(self) -> None: """Helper handler that is called upon an iothub pipeline connect""" logger.info("Connection State - Connected") - client_event_inbox = self._inbox_manager.get_client_event_inbox() - # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it - if self._handler_manager.handling_client_events: - event = client_event.ClientEvent(client_event.CONNECTION_STATE_CHANGE) - client_event_inbox.put(event) - # Ensure that all handlers are running now that connection is re-established. - self._handler_manager.ensure_running() - - def _on_disconnected(self): + if self._inbox_manager is not None: + client_event_inbox = self._inbox_manager.get_client_event_inbox() + # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it + if self._handler_manager.handling_client_events: + event = client_event.ClientEvent(client_event.CONNECTION_STATE_CHANGE) + client_event_inbox.put(event) + # Ensure that all handlers are running now that connection is re-established. + self._handler_manager.ensure_running() + + def _on_disconnected(self) -> None: """Helper handler that is called upon an iothub pipeline disconnect""" logger.info("Connection State - Disconnected") - client_event_inbox = self._inbox_manager.get_client_event_inbox() - # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it - if self._handler_manager.handling_client_events: - event = client_event.ClientEvent(client_event.CONNECTION_STATE_CHANGE) - client_event_inbox.put(event) - # Locally stored method requests on client are cleared. - # They will be resent by IoTHub on reconnect. - self._inbox_manager.clear_all_method_requests() - logger.info("Cleared all pending method requests due to disconnect") - - def _on_new_sastoken_required(self): + if self._inbox_manager is not None: + client_event_inbox = self._inbox_manager.get_client_event_inbox() + # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it + if self._handler_manager.handling_client_events: + event = client_event.ClientEvent(client_event.CONNECTION_STATE_CHANGE) + client_event_inbox.put(event) + # Locally stored method requests on client are cleared. + # They will be resent by IoTHub on reconnect. + self._inbox_manager.clear_all_method_requests() + logger.info("Cleared all pending method requests due to disconnect") + + def _on_new_sastoken_required(self) -> None: """Helper handler that is called upon the iothub pipeline needing new SAS token""" logger.info("New SasToken required from user") - client_event_inbox = self._inbox_manager.get_client_event_inbox() - # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it - if self._handler_manager.handling_client_events: - event = client_event.ClientEvent(client_event.NEW_SASTOKEN_REQUIRED) - client_event_inbox.put(event) - - def _on_background_exception(self, e): + if self._inbox_manager is not None: + client_event_inbox = self._inbox_manager.get_client_event_inbox() + # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it + if self._handler_manager.handling_client_events: + event = client_event.ClientEvent(client_event.NEW_SASTOKEN_REQUIRED) + client_event_inbox.put(event) + + def _on_background_exception(self, e: Exception) -> None: """Helper handler that is called upon an iothub pipeline background exception""" handle_exceptions.handle_background_exception(e) - client_event_inbox = self._inbox_manager.get_client_event_inbox() - # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it - if self._handler_manager.handling_client_events: - event = client_event.ClientEvent(client_event.BACKGROUND_EXCEPTION, e) - client_event_inbox.put(event) - - def _check_receive_mode_is_api(self): + if self._inbox_manager is not None: + client_event_inbox = self._inbox_manager.get_client_event_inbox() + # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it + if self._handler_manager.handling_client_events: + event = client_event.ClientEvent(client_event.BACKGROUND_EXCEPTION, e) + client_event_inbox.put(event) + + def _check_receive_mode_is_api(self) -> None: """Call this function first in EVERY receive API""" with self._client_lock: if self._receive_type is RECEIVE_TYPE_NONE_SET: @@ -177,14 +187,15 @@ def _check_receive_mode_is_api(self): else: pass - def _check_receive_mode_is_handler(self): + def _check_receive_mode_is_handler(self) -> None: """Call this function first in EVERY handler setter""" with self._client_lock: if self._receive_type is RECEIVE_TYPE_NONE_SET: # Lock the client to ONLY use receive handlers (no APIs) self._receive_type = RECEIVE_TYPE_HANDLER # Set the inbox manager to use unified msg receives - self._inbox_manager.use_unified_msg_mode = True + if self._inbox_manager is not None: + self._inbox_manager.use_unified_msg_mode = True elif self._receive_type is RECEIVE_TYPE_API: raise exceptions.ClientError( "Cannot set receive handlers - receive APIs have already been used" @@ -192,7 +203,7 @@ def _check_receive_mode_is_handler(self): else: pass - def _replace_user_supplied_sastoken(self, sastoken_str): + def _replace_user_supplied_sastoken(self, sastoken_str: str) -> None: """ Replaces the pipeline's NonRenewableSasToken with a new one based on a provided sastoken string. Also does validation. @@ -220,7 +231,7 @@ def _replace_user_supplied_sastoken(self, sastoken_str): raise ValueError("Provided SasToken is for a device") if self._mqtt_pipeline.pipeline_configuration.device_id != vals["device_id"]: raise ValueError("Provided SasToken does not match existing device id") - if self._mqtt_pipeline.pipeline_configuration.module_id != vals["module_id"]: + if vals["module_id"] != "" and self._mqtt_pipeline.pipeline_configuration.module_id != vals["module_id"]: raise ValueError("Provided SasToken does not match existing module id") if self._mqtt_pipeline.pipeline_configuration.hostname != vals["hostname"]: raise ValueError("Provided SasToken does not match existing hostname") @@ -232,12 +243,17 @@ def _replace_user_supplied_sastoken(self, sastoken_str): self._mqtt_pipeline.pipeline_configuration.sastoken = new_token_o @abc.abstractmethod - def _generic_receive_handler_setter(self, handler_name, feature_name, new_handler): + def _generic_receive_handler_setter( + self, + handler_name: str, + feature_name: str, + new_handler: Optional[FunctionOrCoroutine[[Any], Any]], + ) -> None: # Will be implemented differently in child classes, but define here for static analysis pass @classmethod - def create_from_connection_string(cls, connection_string, **kwargs): + def create_from_connection_string(cls, connection_string: str, **kwargs) -> Self: """ Instantiate the client from a IoTHub device or module connection string. @@ -281,18 +297,18 @@ def create_from_connection_string(cls, connection_string, **kwargs): _validate_kwargs(exclude=excluded_kwargs, **kwargs) # Create SasToken - connection_string = cs.ConnectionString(connection_string) - if connection_string.get(cs.X509) is not None: + connection_string_dict = cs.ConnectionString(connection_string) + if connection_string_dict.get(cs.X509) is not None: raise ValueError( "Use the .create_from_x509_certificate() method instead when using X509 certificates" ) uri = _form_sas_uri( - hostname=connection_string[cs.HOST_NAME], - device_id=connection_string[cs.DEVICE_ID], - module_id=connection_string.get(cs.MODULE_ID), + hostname=connection_string_dict[cs.HOST_NAME], + device_id=connection_string_dict[cs.DEVICE_ID], + module_id=connection_string_dict.get(cs.MODULE_ID), ) signing_mechanism = auth.SymmetricKeySigningMechanism( - key=connection_string[cs.SHARED_ACCESS_KEY] + key=connection_string_dict[cs.SHARED_ACCESS_KEY] ) token_ttl = kwargs.get("sastoken_ttl", 3600) try: @@ -304,12 +320,12 @@ def create_from_connection_string(cls, connection_string, **kwargs): # Pipeline Config setup config_kwargs = _get_config_kwargs(**kwargs) pipeline_configuration = pipeline.IoTHubPipelineConfig( - device_id=connection_string[cs.DEVICE_ID], - module_id=connection_string.get(cs.MODULE_ID), - hostname=connection_string[cs.HOST_NAME], - gateway_hostname=connection_string.get(cs.GATEWAY_HOST_NAME), + device_id=connection_string_dict[cs.DEVICE_ID], + module_id=connection_string_dict.get(cs.MODULE_ID), + hostname=connection_string_dict[cs.HOST_NAME], + gateway_hostname=connection_string_dict.get(cs.GATEWAY_HOST_NAME), sastoken=sastoken, - **config_kwargs + **config_kwargs, ) if cls.__name__ == "IoTHubDeviceClient": pipeline_configuration.blob_upload = True @@ -321,7 +337,7 @@ def create_from_connection_string(cls, connection_string, **kwargs): return cls(mqtt_pipeline, http_pipeline) @classmethod - def create_from_sastoken(cls, sastoken, **kwargs): + def create_from_sastoken(cls, sastoken: str, **kwargs: Dict[str, Any]) -> Self: """Instantiate the client from a pre-created SAS Token string :param str sastoken: The SAS Token string @@ -381,7 +397,7 @@ def create_from_sastoken(cls, sastoken, **kwargs): module_id=vals["module_id"], hostname=vals["hostname"], sastoken=sastoken_o, - **config_kwargs + **config_kwargs, ) if cls.__name__ == "IoTHubDeviceClient": pipeline_configuration.blob_upload = True # Blob Upload is a feature on Device Clients @@ -393,66 +409,70 @@ def create_from_sastoken(cls, sastoken, **kwargs): return cls(mqtt_pipeline, http_pipeline) @abc.abstractmethod - def shutdown(self): + def shutdown(self) -> None: pass @abc.abstractmethod - def connect(self): + def connect(self) -> None: pass @abc.abstractmethod - def disconnect(self): + def disconnect(self) -> None: pass @abc.abstractmethod - def update_sastoken(self, sastoken): + def update_sastoken(self, sastoken: str) -> None: pass @abc.abstractmethod - def send_message(self, message): + def send_message(self, message: Union[Message, str]) -> None: pass @abc.abstractmethod - def receive_method_request(self, method_name=None): + def receive_method_request(self, method_name: Optional[str] = None) -> None: pass @abc.abstractmethod - def send_method_response(self, method_request, payload, status): + def send_method_response( + self, method_response: MethodResponse + ) -> None: pass @abc.abstractmethod - def get_twin(self): + def get_twin(self) -> Twin: pass @abc.abstractmethod - def patch_twin_reported_properties(self, reported_properties_patch): + def patch_twin_reported_properties(self, reported_properties_patch: TwinPatch) -> None: pass @abc.abstractmethod - def receive_twin_desired_properties_patch(self): + def receive_twin_desired_properties_patch(self) -> TwinPatch: pass @property - def connected(self): + def connected(self) -> bool: """ Read-only property to indicate if the transport is connected or not. """ return self._mqtt_pipeline.connected @property - def on_connection_state_change(self): + def on_connection_state_change(self) -> FunctionOrCoroutine[[None], None]: """The handler function or coroutine that will be called when the connection state changes. The function or coroutine definition should take no positional arguments. """ - return self._handler_manager.on_connection_state_change + if self._handler_manager is not None: + return self._handler_manager.on_connection_state_change @on_connection_state_change.setter - def on_connection_state_change(self, value): - self._handler_manager.on_connection_state_change = value + def on_connection_state_change(self, value: FunctionOrCoroutine[[None], None]) -> None: + if self._handler_manager is not None: + self._handler_manager.on_connection_state_change = value @property - def on_new_sastoken_required(self): + def on_new_sastoken_required(self) -> FunctionOrCoroutine[[None], None]: """The handler function or coroutine that will be called when the client requires a new SAS token. This will happen approximately 2 minutes before the SAS Token expires. On Windows platforms, if the lifespan exceeds approximately 49 days, a new token will @@ -466,31 +486,35 @@ def on_new_sastoken_required(self): The function or coroutine definition should take no positional arguments. """ - return self._handler_manager.on_new_sastoken_required + if self._handler_manager is not None: + return self._handler_manager.on_new_sastoken_required @on_new_sastoken_required.setter - def on_new_sastoken_required(self, value): - self._handler_manager.on_new_sastoken_required = value + def on_new_sastoken_required(self, value: FunctionOrCoroutine[[None], None]) -> None: + if self._handler_manager is not None: + self._handler_manager.on_new_sastoken_required = value @property - def on_background_exception(self): + def on_background_exception(self) -> FunctionOrCoroutine[[Exception], None]: """The handler function or coroutine will be called when a background exception occurs. The function or coroutine definition should take one positional argument (the exception object)""" - return self._handler_manager.on_background_exception + if self._handler_manager is not None: + return self._handler_manager.on_background_exception @on_background_exception.setter - def on_background_exception(self, value): - self._handler_manager.on_background_exception = value + def on_background_exception(self, value: FunctionOrCoroutine[[Exception], None]) -> None: + if self._handler_manager is not None: + self._handler_manager.on_background_exception = value @abc.abstractproperty - def on_message_received(self): + def on_message_received(self) -> FunctionOrCoroutine[[Message], None]: # Defined below on AbstractIoTHubDeviceClient / AbstractIoTHubModuleClient pass @property - def on_method_request_received(self): + def on_method_request_received(self) -> FunctionOrCoroutine[[MethodRequest], None]: """The handler function or coroutine that will be called when a method request is received. Remember to acknowledge the method request in your function or coroutine via use of the @@ -498,25 +522,29 @@ def on_method_request_received(self): The function or coroutine definition should take one positional argument (the :class:`azure.iot.device.MethodRequest` object)""" - return self._handler_manager.on_method_request_received + if self._handler_manager is not None: + return self._handler_manager.on_method_request_received @on_method_request_received.setter - def on_method_request_received(self, value): + def on_method_request_received(self, value: FunctionOrCoroutine[[MethodRequest], None]) -> None: self._generic_receive_handler_setter( "on_method_request_received", pipeline_constant.METHODS, value ) @property - def on_twin_desired_properties_patch_received(self): + def on_twin_desired_properties_patch_received(self) -> FunctionOrCoroutine[[TwinPatch], None]: """The handler function or coroutine that will be called when a twin desired properties patch is received. The function or coroutine definition should take one positional argument (the twin patch in the form of a JSON dictionary object)""" - return self._handler_manager.on_twin_desired_properties_patch_received + if self._handler_manager is not None: + return self._handler_manager.on_twin_desired_properties_patch_received @on_twin_desired_properties_patch_received.setter - def on_twin_desired_properties_patch_received(self, value): + def on_twin_desired_properties_patch_received( + self, value: FunctionOrCoroutine[[TwinPatch], None] + ): self._generic_receive_handler_setter( "on_twin_desired_properties_patch_received", pipeline_constant.TWIN_PATCHES, value ) @@ -524,7 +552,9 @@ def on_twin_desired_properties_patch_received(self, value): class AbstractIoTHubDeviceClient(AbstractIoTHubClient): @classmethod - def create_from_x509_certificate(cls, x509, hostname, device_id, **kwargs): + def create_from_x509_certificate( + cls, x509: X509, hostname: str, device_id: str, **kwargs + ) -> Self: """ Instantiate a client using X509 certificate authentication. @@ -586,7 +616,9 @@ def create_from_x509_certificate(cls, x509, hostname, device_id, **kwargs): return cls(mqtt_pipeline, http_pipeline) @classmethod - def create_from_symmetric_key(cls, symmetric_key, hostname, device_id, **kwargs): + def create_from_symmetric_key( + cls, symmetric_key: str, hostname: str, device_id: str, **kwargs + ) -> Self: """ Instantiate a client using symmetric key authentication. @@ -657,29 +689,30 @@ def create_from_symmetric_key(cls, symmetric_key, hostname, device_id, **kwargs) return cls(mqtt_pipeline, http_pipeline) @abc.abstractmethod - def receive_message(self): + def receive_message(self) -> Message: pass @abc.abstractmethod - def get_storage_info_for_blob(self, blob_name): + def get_storage_info_for_blob(self, blob_name: str) -> Dict[str, Any]: pass @abc.abstractmethod def notify_blob_upload_status( - self, correlation_id, is_success, status_code, status_description - ): + self, correlation_id: str, is_success: bool, status_code: int, status_description: str + ) -> None: pass @property - def on_message_received(self): + def on_message_received(self) -> FunctionOrCoroutine[[Message], None]: """The handler function or coroutine that will be called when a message is received. The function or coroutine definition should take one positional argument (the :class:`azure.iot.device.Message` object)""" - return self._handler_manager.on_message_received + if self._handler_manager is not None: + return self._handler_manager.on_message_received @on_message_received.setter - def on_message_received(self, value): + def on_message_received(self, value: FunctionOrCoroutine[[Message], None]): self._generic_receive_handler_setter( "on_message_received", pipeline_constant.C2D_MSG, value ) @@ -687,7 +720,7 @@ def on_message_received(self, value): class AbstractIoTHubModuleClient(AbstractIoTHubClient): @classmethod - def create_from_edge_environment(cls, **kwargs): + def create_from_edge_environment(cls, **kwargs) -> Self: """ Instantiate the client from the IoT Edge environment. @@ -794,11 +827,11 @@ def create_from_edge_environment(cls, **kwargs): try: sastoken = st.RenewableSasToken(uri, signing_mechanism, ttl=token_ttl) except st.SasTokenError as e: - new_err = ValueError( + new_val_err = ValueError( "Could not create a SasToken using the values provided, or in the Edge environment" ) - new_err.__cause__ = e - raise new_err + new_val_err.__cause__ = e + raise new_val_err # Pipeline Config setup config_kwargs = _get_config_kwargs(**kwargs) @@ -809,7 +842,7 @@ def create_from_edge_environment(cls, **kwargs): gateway_hostname=gateway_hostname, sastoken=sastoken, server_verification_cert=server_verification_cert, - **config_kwargs + **config_kwargs, ) pipeline_configuration.ensure_desired_properties = True @@ -824,7 +857,9 @@ def create_from_edge_environment(cls, **kwargs): return cls(mqtt_pipeline, http_pipeline) @classmethod - def create_from_x509_certificate(cls, x509, hostname, device_id, module_id, **kwargs): + def create_from_x509_certificate( + cls, x509: X509, hostname: str, device_id: str, module_id: str, **kwargs + ) -> Self: """ Instantiate a client using X509 certificate authentication. @@ -885,27 +920,30 @@ def create_from_x509_certificate(cls, x509, hostname, device_id, module_id, **kw return cls(mqtt_pipeline, http_pipeline) @abc.abstractmethod - def send_message_to_output(self, message, output_name): + def send_message_to_output(self, message: Union[Message, str], output_name: str) -> None: pass @abc.abstractmethod - def receive_message_on_input(self, input_name): + def receive_message_on_input(self, input_name: str) -> Message: pass @abc.abstractmethod - def invoke_method(self, method_params, device_id, module_id=None): + def invoke_method( + self, method_params: dict, device_id: str, module_id: Optional[str] = None + ) -> None: pass @property - def on_message_received(self): + def on_message_received(self) -> FunctionOrCoroutine[[Message], Any]: """The handler function or coroutine that will be called when an input message is received. The function definition or coroutine should take one positional argument (the :class:`azure.iot.device.Message` object)""" - return self._handler_manager.on_message_received + if self._handler_manager is not None: + return self._handler_manager.on_message_received @on_message_received.setter - def on_message_received(self, value): + def on_message_received(self, value: FunctionOrCoroutine[[Message], Any]) -> None: self._generic_receive_handler_setter( "on_message_received", pipeline_constant.INPUT_MSG, value ) diff --git a/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py b/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py index c57f3ec14..6eaca9c1a 100644 --- a/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py @@ -6,7 +6,7 @@ """This module contains user-facing asynchronous clients for the Azure IoTHub Device SDK for Python. """ - +from __future__ import annotations # Needed for annotation bug < 3.10 import logging import asyncio import deprecation @@ -16,7 +16,7 @@ AbstractIoTHubDeviceClient, AbstractIoTHubModuleClient, ) -from azure.iot.device.iothub.models import Message +from azure.iot.device.iothub.models import Message, MethodRequest, MethodResponse from azure.iot.device.iothub.pipeline import constant from azure.iot.device.iothub.pipeline import exceptions as pipeline_exceptions from azure.iot.device import exceptions @@ -24,11 +24,14 @@ from .async_inbox import AsyncClientInbox from . import async_handler_manager, loop_management from azure.iot.device import constant as device_constant +from azure.iot.device.iothub.pipeline import MQTTPipeline, HTTPPipeline +from azure.iot.device.custom_typing import FunctionOrCoroutine, StorageInfo, Twin, TwinPatch +from typing import Any, Optional, Union logger = logging.getLogger(__name__) -async def handle_result(callback): +async def handle_result(callback: FunctionOrCoroutine[[Any], None]): try: return await callback.completion() except pipeline_exceptions.ConnectionDroppedError as e: @@ -91,7 +94,7 @@ def __init__(self, **kwargs): self._mqtt_pipeline.on_method_request_received = self._inbox_manager.route_method_request self._mqtt_pipeline.on_twin_patch_received = self._inbox_manager.route_twin_patch - async def _enable_feature(self, feature_name): + async def _enable_feature(self, feature_name: str) -> None: """Enable an Azure IoT Hub feature :param feature_name: The name of the feature to enable. @@ -111,7 +114,7 @@ async def _enable_feature(self, feature_name): # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already enabled - skipping".format(feature_name)) - async def _disable_feature(self, feature_name): + async def _disable_feature(self, feature_name: str) -> None: """Disable an Azure IoT Hub feature :param feature_name: The name of the feature to enable. @@ -131,7 +134,9 @@ async def _disable_feature(self, feature_name): # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already disabled - skipping".format(feature_name)) - def _generic_receive_handler_setter(self, handler_name, feature_name, new_handler): + def _generic_receive_handler_setter( + self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[None], None] + ) -> None: """Set a receive handler on the handler manager and enable the corresponding feature. This is a synchronous call (yes, even though this is the async client), meaning that this @@ -163,7 +168,7 @@ def _generic_receive_handler_setter(self, handler_name, feature_name, new_handle fut = asyncio.run_coroutine_threadsafe(self._disable_feature(feature_name), loop=loop) fut.result() - async def shutdown(self): + async def shutdown(self) -> None: """Shut down the client for graceful exit. Once this method is called, any attempts at further client calls will result in a @@ -207,7 +212,7 @@ async def shutdown(self): # capability for HTTP pipeline. logger.info("Client shutdown complete") - async def connect(self): + async def connect(self) -> None: """Connects the client to an Azure IoT Hub or Azure IoT Edge Hub instance. The destination is chosen based on the credentials passed via the auth_provider parameter @@ -232,7 +237,7 @@ async def connect(self): logger.info("Successfully connected to Hub") - async def disconnect(self): + async def disconnect(self) -> None: """Disconnect the client from the Azure IoT Hub or Azure IoT Edge Hub instance. It is recommended that you make sure to call this coroutine when you are completely done @@ -277,7 +282,7 @@ async def disconnect(self): logger.info("Successfully disconnected from Hub") - async def update_sastoken(self, sastoken): + async def update_sastoken(self, sastoken: str) -> None: """ Update the client's SAS Token used for authentication, then reauthorizes the connection. @@ -316,7 +321,7 @@ async def update_sastoken(self, sastoken): logger.info("Successfully reauthorized connection to Hub") - async def send_message(self, message): + async def send_message(self, message: Union[Message, str]) -> None: """Sends a message to the default events endpoint on the Azure IoT Hub or Azure IoT Edge Hub instance. If the connection to the service has not previously been opened by a call to connect, this @@ -360,7 +365,7 @@ async def send_message(self, message): current_version=device_constant.VERSION, details="We recommend that you use the .on_method_request_received property to set a handler instead", ) - async def receive_method_request(self, method_name=None): + async def receive_method_request(self, method_name: Optional[str] = None) -> MethodRequest: """Receive a method request via the Azure IoT Hub or Azure IoT Edge Hub. If no method request is yet available, will wait until it is available. @@ -384,7 +389,7 @@ async def receive_method_request(self, method_name=None): logger.info("Received method request") return method_request - async def send_method_response(self, method_response): + async def send_method_response(self, method_response: MethodResponse) -> None: """Send a response to a method request via the Azure IoT Hub or Azure IoT Edge Hub. If the connection to the service has not previously been opened by a call to connect, this @@ -419,7 +424,7 @@ async def send_method_response(self, method_response): logger.info("Successfully sent method response to Hub") - async def get_twin(self): + async def get_twin(self) -> Twin: """ Gets the device or module twin from the Azure IoT Hub or Azure IoT Edge Hub service. @@ -452,7 +457,7 @@ async def get_twin(self): logger.info("Successfully retrieved twin") return twin - async def patch_twin_reported_properties(self, reported_properties_patch): + async def patch_twin_reported_properties(self, reported_properties_patch: TwinPatch) -> None: """ Update reported properties with the Azure IoT Hub or Azure IoT Edge Hub service. @@ -495,7 +500,7 @@ async def patch_twin_reported_properties(self, reported_properties_patch): current_version=device_constant.VERSION, details="We recommend that you use the .on_twin_desired_properties_patch_received property to set a handler instead", ) - async def receive_twin_desired_properties_patch(self): + async def receive_twin_desired_properties_patch(self) -> TwinPatch: """ Receive a desired property patch via the Azure IoT Hub or Azure IoT Edge Hub. @@ -519,7 +524,7 @@ async def receive_twin_desired_properties_patch(self): class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient): """An asynchronous device client that connects to an Azure IoT Hub instance.""" - def __init__(self, mqtt_pipeline, http_pipeline): + def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline): """Initializer for a IoTHubDeviceClient. This initializer should not be called directly. @@ -536,7 +541,7 @@ def __init__(self, mqtt_pipeline, http_pipeline): current_version=device_constant.VERSION, details="We recommend that you use the .on_message_received property to set a handler instead", ) - async def receive_message(self): + async def receive_message(self) -> Message: """Receive a message that has been sent from the Azure IoT Hub. If no message is yet available, will wait until an item is available. @@ -555,7 +560,7 @@ async def receive_message(self): logger.info("Message received") return message - async def get_storage_info_for_blob(self, blob_name): + async def get_storage_info_for_blob(self, blob_name: str) -> StorageInfo: """Sends a POST request over HTTP to an IoTHub endpoint that will return information for uploading via the Azure Storage Account linked to the IoTHub your device is connected to. :param str blob_name: The name in string format of the blob that will be uploaded using the storage API. This name will be used to generate the proper credentials for Storage, and needs to match what will be used with the Azure Storage SDK to perform the blob upload. @@ -573,8 +578,8 @@ async def get_storage_info_for_blob(self, blob_name): return storage_info async def notify_blob_upload_status( - self, correlation_id, is_success, status_code, status_description - ): + self, correlation_id: str, is_success: bool, status_code: int, status_description: str + ) -> None: """When the upload is complete, the device sends a POST request to the IoT Hub endpoint with information on the status of an upload to blob attempt. This is used by IoT Hub to notify listening clients. :param str correlation_id: Provided by IoT Hub on get_storage_info_for_blob request. @@ -601,7 +606,7 @@ async def notify_blob_upload_status( class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient): """An asynchronous module client that connects to an Azure IoT Hub or Azure IoT Edge instance.""" - def __init__(self, mqtt_pipeline, http_pipeline): + def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline): """Initializer for a IoTHubModuleClient. This initializer should not be called directly. @@ -613,7 +618,7 @@ def __init__(self, mqtt_pipeline, http_pipeline): super().__init__(mqtt_pipeline=mqtt_pipeline, http_pipeline=http_pipeline) self._mqtt_pipeline.on_input_message_received = self._inbox_manager.route_input_message - async def send_message_to_output(self, message, output_name): + async def send_message_to_output(self, message: Union[Message, str], output_name: str) -> None: """Sends an event/message to the given module output. These are outgoing events and are meant to be "output events" @@ -664,7 +669,7 @@ async def send_message_to_output(self, message, output_name): current_version=device_constant.VERSION, details="We recommend that you use the .on_message_received property to set a handler instead", ) - async def receive_message_on_input(self, input_name): + async def receive_message_on_input(self, input_name: str) -> Message: """Receive an input message that has been sent from another Module to a specific input. If no message is yet available, will wait until an item is available. @@ -685,7 +690,9 @@ async def receive_message_on_input(self, input_name): logger.info("Input message received on: " + input_name) return message - async def invoke_method(self, method_params, device_id, module_id=None): + async def invoke_method( + self, method_params, device_id, module_id: Optional[str] = None + ) -> MethodResponse: """Invoke a method from your client onto a device or module client, and receive the response to the method call. :param dict method_params: Should contain a methodName (str), payload (str), diff --git a/azure-iot-device/azure/iot/device/iothub/edge_hsm.py b/azure-iot-device/azure/iot/device/iothub/edge_hsm.py index a443994f7..4c23beabb 100644 --- a/azure-iot-device/azure/iot/device/iothub/edge_hsm.py +++ b/azure-iot-device/azure/iot/device/iothub/edge_hsm.py @@ -7,7 +7,7 @@ import logging import json import base64 -import requests +import requests # type: ignore import requests_unixsocket import urllib from azure.iot.device.common.auth.signing_mechanism import SigningMechanism diff --git a/azure-iot-device/azure/iot/device/iothub/models/message.py b/azure-iot-device/azure/iot/device/iothub/models/message.py index 599c27502..8332ec090 100644 --- a/azure-iot-device/azure/iot/device/iothub/models/message.py +++ b/azure-iot-device/azure/iot/device/iothub/models/message.py @@ -50,7 +50,7 @@ def __init__( self._iothub_interface_id = None @property - def iothub_interface_id(self): + def iothub_interface_id(self) -> str: return self._iothub_interface_id def set_as_security_message(self): @@ -64,7 +64,7 @@ def set_as_security_message(self): def __str__(self): return str(self.data) - def get_size(self): + def get_size(self) -> int: total = 0 total = total + sum( sys.getsizeof(v) diff --git a/azure-iot-device/azure/iot/device/iothub/models/methods.py b/azure-iot-device/azure/iot/device/iothub/models/methods.py index 16d60afaa..53fea1475 100644 --- a/azure-iot-device/azure/iot/device/iothub/models/methods.py +++ b/azure-iot-device/azure/iot/device/iothub/models/methods.py @@ -5,6 +5,9 @@ # -------------------------------------------------------------------------- """This module contains classes related to direct method invocations. """ +from typing import Optional +from typing_extensions import Self +from azure.iot.device.custom_typing import JSONSerializable class MethodRequest(object): @@ -15,7 +18,7 @@ class MethodRequest(object): :ivar dict payload: The JSON payload being sent with the request. """ - def __init__(self, request_id, name, payload): + def __init__(self, request_id: str, name: str, payload: JSONSerializable): """Initializer for a MethodRequest. :param str request_id: The request id. @@ -27,15 +30,15 @@ def __init__(self, request_id, name, payload): self._payload = payload @property - def request_id(self): + def request_id(self) -> str: return self._request_id @property - def name(self): + def name(self) -> str: return self._name @property - def payload(self): + def payload(self) -> JSONSerializable: return self._payload @@ -48,7 +51,7 @@ class MethodResponse(object): :type payload: dict, str, int, float, bool, or None (JSON compatible values) """ - def __init__(self, request_id, status, payload=None): + def __init__(self, request_id: str, status: int, payload: Optional[JSONSerializable] = None): """Initializer for MethodResponse. :param str request_id: The request id of the MethodRequest being responded to. @@ -61,7 +64,7 @@ def __init__(self, request_id, status, payload=None): self.payload = payload @classmethod - def create_from_method_request(cls, method_request, status, payload=None): + def create_from_method_request(cls, method_request: MethodRequest, status: int, payload: Optional[JSONSerializable] = None) -> Self: """Factory method for creating a MethodResponse from a MethodRequest. :param method_request: The MethodRequest object to respond to. diff --git a/azure-iot-device/azure/iot/device/iothub/sync_clients.py b/azure-iot-device/azure/iot/device/iothub/sync_clients.py index a90cc04a0..4088e0b6f 100644 --- a/azure-iot-device/azure/iot/device/iothub/sync_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/sync_clients.py @@ -6,15 +6,16 @@ """This module contains user-facing synchronous clients for the Azure IoTHub Device SDK for Python. """ - +from __future__ import annotations # Needed for annotation bug < 3.10 import logging +from queue import Queue import deprecation from .abstract_clients import ( AbstractIoTHubClient, AbstractIoTHubDeviceClient, AbstractIoTHubModuleClient, ) -from .models import Message +from .models import Message, MethodResponse, MethodRequest from .inbox_manager import InboxManager from .sync_inbox import SyncClientInbox, InboxEmpty from . import sync_handler_manager @@ -23,7 +24,9 @@ from azure.iot.device import exceptions from azure.iot.device.common.evented_callback import EventedCallback from azure.iot.device import constant as device_constant - +from .pipeline import MQTTPipeline, HTTPPipeline +from azure.iot.device.custom_typing import FunctionOrCoroutine, StorageInfo, Twin, TwinPatch +from typing import Optional, Union logger = logging.getLogger(__name__) @@ -91,7 +94,7 @@ def __init__(self, **kwargs): self._mqtt_pipeline.on_method_request_received = self._inbox_manager.route_method_request self._mqtt_pipeline.on_twin_patch_received = self._inbox_manager.route_twin_patch - def _enable_feature(self, feature_name): + def _enable_feature(self, feature_name: str) -> None: """Enable an Azure IoT Hub feature. This is a synchronous call, meaning that this function will not return until the feature @@ -111,7 +114,7 @@ def _enable_feature(self, feature_name): # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already disabled - skipping".format(feature_name)) - def _disable_feature(self, feature_name): + def _disable_feature(self, feature_name: str) -> None: """Disable an Azure IoT Hub feature This is a synchronous call, meaning that this function will not return until the feature @@ -132,7 +135,9 @@ def _disable_feature(self, feature_name): # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already disabled - skipping".format(feature_name)) - def _generic_receive_handler_setter(self, handler_name, feature_name, new_handler): + def _generic_receive_handler_setter( + self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[None], None] + ) -> None: """Set a receive handler on the handler manager and enable the corresponding feature. This is a synchronous call, meaning that this function will not return until the feature @@ -154,7 +159,7 @@ def _generic_receive_handler_setter(self, handler_name, feature_name, new_handle elif new_handler is None and self._mqtt_pipeline.feature_enabled[feature_name]: self._disable_feature(feature_name) - def shutdown(self): + def shutdown(self) -> None: """Shut down the client for graceful exit. Once this method is called, any attempts at further client calls will result in a @@ -180,7 +185,8 @@ def shutdown(self): logger.debug("Completed pipeline shutdown operation") # Stop the Client Event handlers now that everything else is completed - self._handler_manager.stop(receiver_handlers_only=False) + if self._handler_manager is not None: + self._handler_manager.stop(receiver_handlers_only=False) # Yes, that means the pipeline is disconnected twice (well, actually three times if you # consider that the client-level disconnect causes two pipeline-level disconnects for @@ -197,7 +203,7 @@ def shutdown(self): # capability for HTTP pipeline. logger.info("Client shutdown complete") - def connect(self): + def connect(self) -> None: """Connects the client to an Azure IoT Hub or Azure IoT Edge Hub instance. The destination is chosen based on the credentials passed via the auth_provider parameter @@ -224,7 +230,7 @@ def connect(self): logger.info("Successfully connected to Hub") - def disconnect(self): + def disconnect(self) -> None: """Disconnect the client from the Azure IoT Hub or Azure IoT Edge Hub instance. It is recommended that you make sure to call this function when you are completely done @@ -247,7 +253,8 @@ def disconnect(self): # Note that in the process of stopping the handlers and resolving pending calls # a user-supplied handler may cause a reconnection to occur logger.debug("Stopping handlers...") - self._handler_manager.stop(receiver_handlers_only=True) + if self._handler_manager is not None: + self._handler_manager.stop(receiver_handlers_only=True) logger.debug("Successfully stopped handlers") # Disconnect again to ensure disconnection has occurred due to the issue mentioned above @@ -270,7 +277,7 @@ def disconnect(self): logger.info("Successfully disconnected from Hub") - def update_sastoken(self, sastoken): + def update_sastoken(self, sastoken: str) -> None: """ Update the client's SAS Token used for authentication, then reauthorizes the connection. @@ -306,7 +313,7 @@ def update_sastoken(self, sastoken): logger.info("Successfully reauthorized connection to Hub") - def send_message(self, message): + def send_message(self, message: Union[Message, str]) -> None: """Sends a message to the default events endpoint on the Azure IoT Hub or Azure IoT Edge Hub instance. This is a synchronous event, meaning that this function will not return until the event @@ -352,7 +359,9 @@ def send_message(self, message): current_version=device_constant.VERSION, details="We recommend that you use the .on_method_request_received property to set a handler instead", ) - def receive_method_request(self, method_name=None, block=True, timeout=None): + def receive_method_request( + self, method_name: Optional[str] = None, block: bool = True, timeout: Optional[int] = None + ) -> Optional[MethodRequest]: """Receive a method request via the Azure IoT Hub or Azure IoT Edge Hub. :param str method_name: Optionally provide the name of the method to receive requests for. @@ -369,7 +378,8 @@ def receive_method_request(self, method_name=None, block=True, timeout=None): if not self._mqtt_pipeline.feature_enabled[pipeline_constant.METHODS]: self._enable_feature(pipeline_constant.METHODS) - method_inbox = self._inbox_manager.get_method_request_inbox(method_name) + if self._inbox_manager is not None: + method_inbox : Queue[MethodRequest] = self._inbox_manager.get_method_request_inbox(method_name) logger.info("Waiting for method request...") try: @@ -380,7 +390,7 @@ def receive_method_request(self, method_name=None, block=True, timeout=None): logger.info("Did not receive method request") return method_request - def send_method_response(self, method_response): + def send_method_response(self, method_response: MethodResponse) -> None: """Send a response to a method request via the Azure IoT Hub or Azure IoT Edge Hub. This is a synchronous event, meaning that this function will not return until the event @@ -413,7 +423,7 @@ def send_method_response(self, method_response): logger.info("Successfully sent method response to Hub") - def get_twin(self): + def get_twin(self) -> Twin: """ Gets the device or module twin from the Azure IoT Hub or Azure IoT Edge Hub service. @@ -446,7 +456,7 @@ def get_twin(self): logger.info("Successfully retrieved twin") return twin - def patch_twin_reported_properties(self, reported_properties_patch): + def patch_twin_reported_properties(self, reported_properties_patch: TwinPatch) -> None: """ Update reported properties with the Azure IoT Hub or Azure IoT Edge Hub service. @@ -488,7 +498,7 @@ def patch_twin_reported_properties(self, reported_properties_patch): current_version=device_constant.VERSION, details="We recommend that you use the .on_twin_desired_properties_patch_received property to set a handler instead", ) - def receive_twin_desired_properties_patch(self, block=True, timeout=None): + def receive_twin_desired_properties_patch(self, block=True, timeout=None) -> TwinPatch: """ Receive a desired property patch via the Azure IoT Hub or Azure IoT Edge Hub. @@ -513,7 +523,8 @@ def receive_twin_desired_properties_patch(self, block=True, timeout=None): if not self._mqtt_pipeline.feature_enabled[pipeline_constant.TWIN_PATCHES]: self._enable_feature(pipeline_constant.TWIN_PATCHES) - twin_patch_inbox = self._inbox_manager.get_twin_patch_inbox() + if self._inbox_manager is not None: + twin_patch_inbox : Queue[TwinPatch] = self._inbox_manager.get_twin_patch_inbox() logger.info("Waiting for twin patches...") try: @@ -528,7 +539,7 @@ def receive_twin_desired_properties_patch(self, block=True, timeout=None): class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient): """A synchronous device client that connects to an Azure IoT Hub instance.""" - def __init__(self, mqtt_pipeline, http_pipeline): + def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline): """Initializer for a IoTHubDeviceClient. This initializer should not be called directly. @@ -538,14 +549,15 @@ def __init__(self, mqtt_pipeline, http_pipeline): :type mqtt_pipeline: :class:`azure.iot.device.iothub.pipeline.MQTTPipeline` """ super().__init__(mqtt_pipeline=mqtt_pipeline, http_pipeline=http_pipeline) - self._mqtt_pipeline.on_c2d_message_received = self._inbox_manager.route_c2d_message + if self._inbox_manager is not None: + self._mqtt_pipeline.on_c2d_message_received = self._inbox_manager.route_c2d_message @deprecation.deprecated( deprecated_in="2.3.0", current_version=device_constant.VERSION, details="We recommend that you use the .on_message_received property to set a handler instead", ) - def receive_message(self, block=True, timeout=None): + def receive_message(self, block=True, timeout=None) -> Optional[Message]: """Receive a message that has been sent from the Azure IoT Hub. :param bool block: Indicates if the operation should block until a message is received. @@ -559,7 +571,8 @@ def receive_message(self, block=True, timeout=None): if not self._mqtt_pipeline.feature_enabled[pipeline_constant.C2D_MSG]: self._enable_feature(pipeline_constant.C2D_MSG) - c2d_inbox = self._inbox_manager.get_c2d_message_inbox() + if self._inbox_manager is not None: + c2d_inbox : Queue[Message] = self._inbox_manager.get_c2d_message_inbox() logger.info("Waiting for message from Hub...") try: @@ -570,7 +583,7 @@ def receive_message(self, block=True, timeout=None): logger.info("No message received.") return message - def get_storage_info_for_blob(self, blob_name): + def get_storage_info_for_blob(self, blob_name: str) -> StorageInfo: """Sends a POST request over HTTP to an IoTHub endpoint that will return information for uploading via the Azure Storage Account linked to the IoTHub your device is connected to. :param str blob_name: The name in string format of the blob that will be uploaded using the storage API. This name will be used to generate the proper credentials for Storage, and needs to match what will be used with the Azure Storage SDK to perform the blob upload. @@ -584,8 +597,8 @@ def get_storage_info_for_blob(self, blob_name): return storage_info def notify_blob_upload_status( - self, correlation_id, is_success, status_code, status_description - ): + self, correlation_id: str, is_success: bool, status_code: int, status_description: str + ) -> None: """When the upload is complete, the device sends a POST request to the IoT Hub endpoint with information on the status of an upload to blob attempt. This is used by IoT Hub to notify listening clients. :param str correlation_id: Provided by IoT Hub on get_storage_info_for_blob request. @@ -608,7 +621,7 @@ def notify_blob_upload_status( class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient): """A synchronous module client that connects to an Azure IoT Hub or Azure IoT Edge instance.""" - def __init__(self, mqtt_pipeline, http_pipeline): + def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline): """Initializer for a IoTHubModuleClient. This initializer should not be called directly. @@ -620,9 +633,10 @@ def __init__(self, mqtt_pipeline, http_pipeline): :type http_pipeline: :class:`azure.iot.device.iothub.pipeline.HTTPPipeline` """ super().__init__(mqtt_pipeline=mqtt_pipeline, http_pipeline=http_pipeline) - self._mqtt_pipeline.on_input_message_received = self._inbox_manager.route_input_message + if self._inbox_manager is not None: + self._mqtt_pipeline.on_input_message_received = self._inbox_manager.route_input_message - def send_message_to_output(self, message, output_name): + def send_message_to_output(self, message: Union[Message, str], output_name: str) -> None: """Sends an event/message to the given module output. These are outgoing events and are meant to be "output events". @@ -673,7 +687,9 @@ def send_message_to_output(self, message, output_name): current_version=device_constant.VERSION, details="We recommend that you use the .on_message_received property to set a handler instead", ) - def receive_message_on_input(self, input_name, block=True, timeout=None): + def receive_message_on_input( + self, input_name: str, block: bool = True, timeout: Optional[int] = None + ) -> Optional[Message]: """Receive an input message that has been sent from another Module to a specific input. :param str input_name: The input name to receive a message on. @@ -687,7 +703,8 @@ def receive_message_on_input(self, input_name, block=True, timeout=None): if not self._mqtt_pipeline.feature_enabled[pipeline_constant.INPUT_MSG]: self._enable_feature(pipeline_constant.INPUT_MSG) - input_inbox = self._inbox_manager.get_input_message_inbox(input_name) + if self._inbox_manager is not None: + input_inbox : Queue[Message] = self._inbox_manager.get_input_message_inbox(input_name) logger.info("Waiting for input message on: " + input_name + "...") try: @@ -698,7 +715,7 @@ def receive_message_on_input(self, input_name, block=True, timeout=None): logger.info("No input message received on: " + input_name) return message - def invoke_method(self, method_params, device_id, module_id=None): + def invoke_method(self, method_params: dict, device_id: str, module_id=None): """Invoke a method from your client onto a device or module client, and receive the response to the method call. :param dict method_params: Should contain a methodName (str), payload (str), diff --git a/azure-iot-device/azure/iot/device/patch.py b/azure-iot-device/azure/iot/device/patch.py index 5d4bec270..3b76a4097 100644 --- a/azure-iot-device/azure/iot/device/patch.py +++ b/azure-iot-device/azure/iot/device/patch.py @@ -7,12 +7,13 @@ import inspect import logging +from typing import Dict logger = logging.getLogger(__name__) # This dict will be used as a scope for imports and defs in add_shims_for_inherited_methods # in order to keep them out of the global scope of this module. -shim_scope = {} +shim_scope : Dict[str, str] = {} def add_shims_for_inherited_methods(target_class): diff --git a/azure-iot-device/azure/iot/device/provisioning/abstract_provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/abstract_provisioning_device_client.py index 152dea1bd..6ce83d23e 100644 --- a/azure-iot-device/azure/iot/device/provisioning/abstract_provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/abstract_provisioning_device_client.py @@ -10,15 +10,21 @@ import abc import logging +from typing_extensions import Self +from typing import Any, Dict, List, Optional from azure.iot.device.provisioning import pipeline from azure.iot.device.common.auth import sastoken as st from azure.iot.device.common import auth, handle_exceptions +from .pipeline import MQTTPipeline +from azure.iot.device.common.models import X509 +from azure.iot.device.custom_typing import ProvisioningPayload +from azure.iot.device.provisioning.models import RegistrationResult logger = logging.getLogger(__name__) -def _validate_kwargs(exclude=[], **kwargs): +def _validate_kwargs(exclude: Optional[List[str]] = [], **kwargs): """Helper function to validate user provided kwargs. Raises TypeError if an invalid option has been provided""" @@ -33,16 +39,16 @@ def _validate_kwargs(exclude=[], **kwargs): ] for kwarg in kwargs: - if (kwarg not in valid_kwargs) or (kwarg in exclude): + if (kwarg not in valid_kwargs) or (exclude is not None and kwarg in exclude): raise TypeError("Unsupported keyword argument '{}'".format(kwarg)) -def validate_registration_id(reg_id): +def validate_registration_id(reg_id: str) -> None: if not (reg_id and reg_id.strip()): raise ValueError("Registration Id can not be none, empty or blank.") -def _get_config_kwargs(**kwargs): +def _get_config_kwargs(**kwargs) -> Dict[str, Any]: """Get the subset of kwargs which pertain the config object""" valid_config_kwargs = [ "server_verification_cert", @@ -60,7 +66,7 @@ def _get_config_kwargs(**kwargs): return config_kwargs -def _form_sas_uri(id_scope, registration_id): +def _form_sas_uri(id_scope: str, registration_id: str) -> str: return "{id_scope}/registrations/{registration_id}".format( id_scope=id_scope, registration_id=registration_id ) @@ -71,7 +77,7 @@ class AbstractProvisioningDeviceClient(abc.ABC): Super class for any client that can be used to register devices to Device Provisioning Service. """ - def __init__(self, pipeline): + def __init__(self, pipeline: MQTTPipeline): """ Initializes the provisioning client. @@ -89,8 +95,8 @@ def __init__(self, pipeline): @classmethod def create_from_symmetric_key( - cls, provisioning_host, registration_id, id_scope, symmetric_key, **kwargs - ): + cls, provisioning_host: str, registration_id: str, id_scope: str, symmetric_key: str, **kwargs + ) -> Self: """ Create a client which can be used to run the registration of a device with provisioning service using Symmetric Key authentication. @@ -163,8 +169,8 @@ def create_from_symmetric_key( @classmethod def create_from_x509_certificate( - cls, provisioning_host, registration_id, id_scope, x509, **kwargs - ): + cls, provisioning_host: str, registration_id: str, id_scope: str, x509: X509, **kwargs + ) -> Self: """ Create a client which can be used to run the registration of a device with provisioning service using X509 certificate authentication. @@ -224,18 +230,18 @@ def create_from_x509_certificate( return cls(mqtt_provisioning_pipeline) @abc.abstractmethod - def register(self): + def register(self) -> RegistrationResult: """ Register the device with the Device Provisioning Service. """ pass @property - def provisioning_payload(self): + def provisioning_payload(self) -> ProvisioningPayload: return self._provisioning_payload @provisioning_payload.setter - def provisioning_payload(self, provisioning_payload): + def provisioning_payload(self, provisioning_payload: ProvisioningPayload): """ Set the payload that will form the request payload in a registration request. @@ -245,7 +251,7 @@ def provisioning_payload(self, provisioning_payload): self._provisioning_payload = provisioning_payload -def log_on_register_complete(result=None): +def log_on_register_complete(result: Optional[RegistrationResult] = None) -> None: # This could be a failed/successful registration result from DPS # or a error from polling machine. Response should be given appropriately if result is not None: diff --git a/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py index acd79c533..33533a7a8 100644 --- a/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py @@ -8,9 +8,11 @@ Device SDK. This client uses Symmetric Key and X509 authentication to register devices with an IoT Hub via the Device Provisioning Service. """ - +from __future__ import annotations # Needed for annotation bug < 3.10 import logging +from typing import Any from azure.iot.device.common import async_adapter +from azure.iot.device.custom_typing import FunctionOrCoroutine from azure.iot.device.provisioning.abstract_provisioning_device_client import ( AbstractProvisioningDeviceClient, ) @@ -20,11 +22,12 @@ from azure.iot.device.provisioning.pipeline import exceptions as pipeline_exceptions from azure.iot.device import exceptions from azure.iot.device.provisioning.pipeline import constant as dps_constant +from azure.iot.device.provisioning.models import RegistrationResult logger = logging.getLogger(__name__) -async def handle_result(callback): +async def handle_result(callback: FunctionOrCoroutine[[Any], Any]) -> Any: try: return await callback.completion() except pipeline_exceptions.ConnectionDroppedError as e: @@ -49,7 +52,7 @@ class ProvisioningDeviceClient(AbstractProvisioningDeviceClient): using Symmetric Key or X509 authentication. """ - async def register(self): + async def register(self) -> RegistrationResult: """ Register the device with the provisioning service. @@ -94,7 +97,7 @@ async def register(self): return result - async def _enable_responses(self): + async def _enable_responses(self) -> None: """Enable to receive responses from Device Provisioning Service.""" logger.info("Enabling reception of response from Device Provisioning Service...") subscribe_async = async_adapter.emulate_async(self._pipeline.enable_responses) diff --git a/azure-iot-device/azure/iot/device/provisioning/models/registration_result.py b/azure-iot-device/azure/iot/device/provisioning/models/registration_result.py index 560d76720..3d614c045 100644 --- a/azure-iot-device/azure/iot/device/provisioning/models/registration_result.py +++ b/azure-iot-device/azure/iot/device/provisioning/models/registration_result.py @@ -4,45 +4,8 @@ # license information. # -------------------------------------------------------------------------- import json - - -class RegistrationResult(object): - """ - The final result of a completed or failed registration attempt - :ivar:request_id: The request id to which the response is being obtained - :ivar:operation_id: The id of the operation as returned by the registration request. - :ivar status: The status of the registration process as returned by the provisioning service. - Values can be "unassigned", "assigning", "assigned", "failed", "disabled" - :ivar registration_state : Details like device id, assigned hub , date times etc returned - from the provisioning service. - """ - - def __init__(self, operation_id, status, registration_state=None): - """ - :param operation_id: The id of the operation as returned by the initial registration request. - :param status: The status of the registration process. - Values can be "unassigned", "assigning", "assigned", "failed", "disabled" - :param registration_state : Details like device id, assigned hub , date times etc returned - from the provisioning service. - """ - self._operation_id = operation_id - self._status = status - self._registration_state = registration_state - - @property - def operation_id(self): - return self._operation_id - - @property - def status(self): - return self._status - - @property - def registration_state(self): - return self._registration_state - - def __str__(self): - return "\n".join([str(self.registration_state), self.status]) +from typing import Optional +from azure.iot.device.custom_typing import JSONSerializable class RegistrationState(object): @@ -86,34 +49,75 @@ def __init__( self._response_payload = payload @property - def device_id(self): + def device_id(self) -> str: return self._device_id @property - def assigned_hub(self): + def assigned_hub(self) -> str: return self._assigned_hub @property - def sub_status(self): + def sub_status(self) -> str: return self._sub_status @property - def created_date_time(self): + def created_date_time(self) -> str: return self._created_date_time @property - def last_update_date_time(self): + def last_update_date_time(self) -> str: return self._last_update_date_time @property - def etag(self): + def etag(self) -> str: return self._etag @property - def response_payload(self): + def response_payload(self) -> JSONSerializable: return json.dumps(self._response_payload, default=lambda o: o.__dict__, sort_keys=True) def __str__(self): return "\n".join( [self.device_id, self.assigned_hub, self.sub_status, self.response_payload] ) + + +class RegistrationResult(object): + """ + The final result of a completed or failed registration attempt + :ivar:request_id: The request id to which the response is being obtained + :ivar:operation_id: The id of the operation as returned by the registration request. + :ivar status: The status of the registration process as returned by the provisioning service. + Values can be "unassigned", "assigning", "assigned", "failed", "disabled" + :ivar registration_state : Details like device id, assigned hub , date times etc returned + from the provisioning service. + """ + + def __init__( + self, operation_id: str, status: str, registration_state: Optional[RegistrationState] = None + ): + """ + :param operation_id: The id of the operation as returned by the initial registration request. + :param status: The status of the registration process. + Values can be "unassigned", "assigning", "assigned", "failed", "disabled" + :param registration_state : Details like device id, assigned hub , date times etc returned + from the provisioning service. + """ + self._operation_id = operation_id + self._status = status + self._registration_state = registration_state + + @property + def operation_id(self) -> str: + return self._operation_id + + @property + def status(self) -> str: + return self._status + + @property + def registration_state(self) -> Optional[RegistrationState]: + return self._registration_state + + def __str__(self): + return "\n".join([str(self.registration_state), self.status]) diff --git a/azure-iot-device/azure/iot/device/provisioning/pipeline/exceptions.py b/azure-iot-device/azure/iot/device/provisioning/pipeline/exceptions.py index c99b4f9b4..52cc5e9db 100644 --- a/azure-iot-device/azure/iot/device/provisioning/pipeline/exceptions.py +++ b/azure-iot-device/azure/iot/device/provisioning/pipeline/exceptions.py @@ -8,7 +8,14 @@ # For now, present relevant transport errors as part of the Pipeline API surface # so that they do not have to be duplicated at this layer. # OK TODO This mimics the IotHub Case. Both IotHub and Provisioning needs to change -from azure.iot.device.common.pipeline.pipeline_exceptions import * # noqa: F401, F403 +from azure.iot.device.common.pipeline.pipeline_exceptions import ( # noqa: F401, F403 + PipelineException, + OperationCancelled, + OperationTimeout, + OperationError, + PipelineNotRunning, + PipelineRuntimeError +) from azure.iot.device.common.transport_exceptions import ( # noqa: F401 ConnectionFailedError, ConnectionDroppedError, diff --git a/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py index 479f9e9ec..7ba535233 100644 --- a/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py @@ -8,19 +8,23 @@ Device SDK. This client uses Symmetric Key and X509 authentication to register devices with an IoT Hub via the Device Provisioning Service. """ +from __future__ import annotations # Needed for annotation bug < 3.10 import logging +from typing import Any from azure.iot.device.common.evented_callback import EventedCallback +from azure.iot.device.custom_typing import FunctionOrCoroutine from .abstract_provisioning_device_client import AbstractProvisioningDeviceClient from .abstract_provisioning_device_client import log_on_register_complete from azure.iot.device.provisioning.pipeline import constant as dps_constant from .pipeline import exceptions as pipeline_exceptions from azure.iot.device import exceptions +from azure.iot.device.provisioning.models import RegistrationResult logger = logging.getLogger(__name__) -def handle_result(callback): +def handle_result(callback: FunctionOrCoroutine[[Any], None]) -> RegistrationResult: try: return callback.wait_for_completion() except pipeline_exceptions.ConnectionDroppedError as e: @@ -47,7 +51,7 @@ class ProvisioningDeviceClient(AbstractProvisioningDeviceClient): using Symmetric Key or X509 authentication. """ - def register(self): + def register(self) -> RegistrationResult: """ Register the device with the provisioning service @@ -94,7 +98,7 @@ def register(self): return result - def _enable_responses(self): + def _enable_responses(self) -> None: """Enable to receive responses from Device Provisioning Service. This is a synchronous call, meaning that this function will not return until the feature diff --git a/azure-iot-device/azure/iot/device/py.typed b/azure-iot-device/azure/iot/device/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/samples/async-edge-scenarios/send_message_downstream.py b/samples/async-edge-scenarios/send_message_downstream.py index 66a0c26dc..3620e170d 100644 --- a/samples/async-edge-scenarios/send_message_downstream.py +++ b/samples/async-edge-scenarios/send_message_downstream.py @@ -27,7 +27,7 @@ async def main(): # The client object is used to interact with your Azure IoT Edge device. device_client = IoTHubDeviceClient.create_from_connection_string( - connection_string=conn_str, server_verification_cert=root_ca_cert + connection_string_dict=conn_str, server_verification_cert=root_ca_cert ) # Connect the client. diff --git a/scripts/configure-virtual-environments.sh b/scripts/configure-virtual-environments.sh index 54c510269..c484563c9 100755 --- a/scripts/configure-virtual-environments.sh +++ b/scripts/configure-virtual-environments.sh @@ -6,7 +6,7 @@ script_dir=$(cd "$(dirname "$0")" && pwd) -export RUNTIMES_TO_INSTALL="3.6.6 3.7.1 3.8.10 3.9.9 3.10.2" +export RUNTIMES_TO_INSTALL="3.7.1 3.8.10 3.9.9 3.10.2" echo "This script will do the following:" echo "1. Use apt to install pre-requisites for pyenv" diff --git a/setup.py b/setup.py index 92f0bb326..c90d5e336 100644 --- a/setup.py +++ b/setup.py @@ -65,7 +65,6 @@ "License :: OSI Approved :: MIT License", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", @@ -85,9 +84,11 @@ "requests-unixsocket>=0.1.5,<1.0.0", "janus", "PySocks", + "typing_extensions", ], - python_requires=">=3.6, <4", + python_requires=">=3.7, <4", packages=find_namespace_packages(where="azure-iot-device"), + package_data={"azure.iot.device": ["py.typed"]}, package_dir={"": "azure-iot-device"}, zip_safe=False, ) diff --git a/tests/unit/iothub/shared_client_tests.py b/tests/unit/iothub/shared_client_tests.py index 488722656..1e8f893ba 100644 --- a/tests/unit/iothub/shared_client_tests.py +++ b/tests/unit/iothub/shared_client_tests.py @@ -1032,7 +1032,7 @@ def test_pipeline_config( # Verify the IoTHubPipelineConfig is constructed as expected config = mock_mqtt_pipeline_init.call_args[0][0] assert config.device_id == expected_device_id - assert config.module_id is None + assert config.module_id == "" assert config.hostname == expected_hostname assert config.gateway_hostname is None assert config.sastoken is sastoken_mock.return_value diff --git a/vsts/build.yaml b/vsts/build.yaml index 816d454ec..b9917a1d1 100644 --- a/vsts/build.yaml +++ b/vsts/build.yaml @@ -29,8 +29,6 @@ jobs: vmImage: 'Ubuntu 20.04' strategy: matrix: - Python36: - python.version: '3.6' Python37: python.version: '3.7' Python38: diff --git a/vsts/python-canary.yaml b/vsts/python-canary.yaml index 8378006e8..10d9bdf53 100644 --- a/vsts/python-canary.yaml +++ b/vsts/python-canary.yaml @@ -16,11 +16,6 @@ jobs: transport: 'mqttws' imageName: 'windows-latest' consumerGroup: 'cg2' - py36_linux_mqtt: - pv: '3.6' - transport: 'mqtt' - imageName: 'Ubuntu 20.04' - consumerGroup: 'cg3' py37_linux_mqttws: pv: '3.7' transport: 'mqttws' diff --git a/vsts/python-e2e.yaml b/vsts/python-e2e.yaml index 4af63f148..85e56d32e 100644 --- a/vsts/python-e2e.yaml +++ b/vsts/python-e2e.yaml @@ -40,7 +40,7 @@ jobs: strategy: matrix: py310_mqtt: { pv: '3.10', transport: 'mqtt', consumer_group: 'e2e-consumer-group-3' } - py36_mqtt_ws: { pv: '3.6', transport: 'mqttws', consumer_group: 'e2e-consumer-group-4' } + py37_mqtt_ws: { pv: '3.7', transport: 'mqttws', consumer_group: 'e2e-consumer-group-4' } steps: - task: UsePythonVersion@0 diff --git a/vsts/python-nightly.yaml b/vsts/python-nightly.yaml index 86f7467ad..cabc9d6f4 100644 --- a/vsts/python-nightly.yaml +++ b/vsts/python-nightly.yaml @@ -17,11 +17,6 @@ jobs: imageName: 'windows-latest' consumerGroup: 'cg2' - py36_linux_mqtt: - pv: '3.6' - transport: 'mqtt' - imageName: 'Ubuntu 20.04' - consumerGroup: 'cg3' py37_linux_mqttws: pv: '3.7' transport: 'mqttws'