Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] Best practice to read/write points of multiple devices at the same IP address #497

Open
Kang-SungKu opened this issue Aug 10, 2023 · 1 comment

Comments

@Kang-SungKu
Copy link

Kang-SungKu commented Aug 10, 2023

Hello, I am using bacpypes (version 0.18.6, Python 3.10.12) to expose readable/writable points of device(s) at a specific IP address, such that the points can be accessed by a building automation system (I am using Niagara N4). I could implement it when there is a single device, but could not make it work when I have multiple devices. I would like to understand the best practice to read/write points of multiple devices at the same IP address. To provide the context, let me provide (1) desired configuration (2) what I did for a single device, and (3) what I did for multiple devices.

Desired configuration

I have a building automation system and multiple simulation models in a single machine or different machines. bacpypes is used to expose 10~100+ readable/writable points from each simulation model as BACnet points preferably bound to a specific IP address, such that the building automation system can read/write points inside the simulation models (via ethernet cable, if the building automation system and simulation models live in the different machines).

What I did for a single device

A custom LocalDeviceObject is created and attached to a custom BIPSimpleApplication, and then the application is bound to an IP address (0.0.0.0) for test. This implementation worked well, and I could read/write points as expected. Following is a part of my script implementing that.

@bacpypes_debugging
class CustomBACnetApplication(BIPSimpleApplication,
                            ReadWritePropertyMultipleServices,
                            DeviceCommunicationControlServices,):
    pass

class CustomBACnetDevice(LocalDeviceObject):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._date_time: datetime = None

    def ReadProperty(self, propid, arrayIndex=None):
        if propid == "localTime" and self._date_time != None:
            time = Time(str(self._date_time.time()))
            return time.value
        if propid == "localDate" and self._date_time != None:
            date = Date(str(self._date_time.date()))
            return date.value
        return super().ReadProperty(propid, arrayIndex)

...

class BACnetBridge():

    def __init__(self, host, site_id: SiteID) -> None:
        self.device = CustomBACnetDevice(
          objectName="Proxy",
          objectIdentifier=int(599),
          maxApduLengthAccepted=int(1024),
          segmentationSupported="segmentedBoth",
          vendorIdentifier=555,
          vendorName=CharacterString("Vendor"), 
          modelName=CharacterString("BACnet Bridge"),
          systemStatus=DeviceStatus(1),
          description=CharacterString("BACpypes (Python) based tool for exposing points"),
          firmwareRevision="0.0.0",
          applicationSoftwareVersion="0.0.0",
          protocolVersion=1,
          protocolRevision=0)

        self.application = CustomBACnetApplication(self.device, "0.0.0.0")

        self.points = {}
...
        for point in self.points.values():
            self.application.add_object(point)

What I did for multiple devices

Based on my search (#159), to scale-up the above use case, I think it is necessary to create a virtual router bound to the same IP address, where multiple BACnet devices are attached to the router. I modified my script based on the sample IP2VLANRouter.py (https://github.com/JoelBender/bacpypes/blob/master/samples/IP2VLANRouter.py) as follows (mostly borrowed from the sample, but I included it in case I missed something):

@bacpypes_debugging
class CustomBACnetApplication(Application,
                               ReadWritePropertyMultipleServices,
                               DeviceCommunicationControlServices,
                               WhoIsIAmServices,):
    def __init__(self, vlan_device, vlan_address, aseID=None):
        ...
        # normal initialization
        Application.__init__(self, vlan_device, aseID=aseID)
        
        # include a application decoder
        self.asap = ApplicationServiceAccessPoint()

        # pass the device object to the state machine access point so it can know if it should support segmentation
        self.smap = StateMachineAccessPoint(vlan_device)

        # the segmentation state machines need access to the same device information cache as the application
        self.smap.deviceInfoCache = self.deviceInfoCache

        # a network service access point will be needed
        self.nsap = NetworkServiceAccessPoint()

        # give the NSAP a generic network layer service element
        self.nse = NetworkServiceElement()
        bind(self.nse, self.nsap)

        # bind the top layers
        bind(self, self.asap, self.smap, self.nsap)

        # create a vlan node at the assigned address
        self.vlan_node = Node(vlan_address)

        # bind the stack to the node, no network number, no addresss
        self.nsap.bind(self.vlan_node)

    def request(self, apdu):
        if _debug: CustomBACnetApplication._debug("[%s]request %r", self.vlan_node.address, apdu)
        Application.request(self, apdu)

    def indication(self, apdu):
        if _debug: CustomBACnetApplication._debug("[%s]indication %r", self.vlan_node.address, apdu)
        Application.indication(self, apdu)

    def response(self, apdu):
        if _debug: CustomBACnetApplication._debug("[%s]response %r", self.vlan_node.address, apdu)
        Application.response(self, apdu)

    def confirmation(self, apdu):
        if _debug: CustomBACnetApplication._debug("[%s]confirmation %r", self.vlan_node.address, apdu)
        Application.confirmation(self, apdu)


class CustomBACnetDevice(LocalDeviceObject):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._date_time: datetime = None

    def ReadProperty(self, propid, arrayIndex=None):
        if propid == "localTime" and self._date_time != None:
            time = Time(str(self._date_time.time()))
            return time.value
        if propid == "localDate" and self._date_time != None:
            date = Date(str(self._date_time.date()))
            return date.value
        return super().ReadProperty(propid, arrayIndex)

@bacpypes_debugging
@register_object_type(vendor_id=555)
class LocalAnalogValueObject(AnalogValueCmdObject):
    def __init__(self, sim_value, **kwargs):
        super().__init__(**kwargs)
        self._sim_value = sim_value

    def ReadProperty(self, propid, arrayIndex=None):
        if propid == "presentValue":
            return self._sim_value
        return super().ReadProperty(propid, arrayIndex)

@bacpypes_debugging
class VLANRouter:
    def __init__(self, local_address, local_network):
        if _debug: VLANRouter._debug("__init__ %r %r", local_address, local_network)

        # a network service access point will be needed
        self.nsap = NetworkServiceAccessPoint()

        # give the NSAP a generic network layer service element
        self.nse = NetworkServiceElement()
        bind(self.nse, self.nsap)

        # create a BIPSimple, bound to the Annex J server on the UDP multiplexer
        #self.bip = BIPSimple(local_address)

        # create a BBMD, bound to the Annex J server on the UDP multiplexer
        self.bip = BIPBBMD(local_address)

        self.annexj = AnnexJCodec()
        self.mux = UDPMultiplexer(local_address)

        # bind the bottom layers
        bind(self.bip, self.annexj, self.mux.annexJ)

        # bind the BIP stack to the local network
        self.nsap.bind(self.bip, local_network, local_address)


class BACnetBridge:
    def __init__(self, host, alias_base: str, num_sites: int, verbose=True) -> None:
        ...

        ## Setup virtual router for BACnet devices
        # create the VLAN router, bind it to the local network
        addr1 = '0.0.0.0'
        net1, net2 = 5, 6
        self.router = VLANRouter(Address(addr1), net1)

        # create a VLAN
        self.vlan = Network(broadcast_address=LocalBroadcast())

        # create a node for the router, address 1 on the VLAN
        router_addr = Address(1)
        router_node = Node(router_addr)
        self.vlan.add_node(router_node)

        # bind the router stack to the vlan network through this node
        self.router.nsap.bind(router_node, net2, router_addr)

        # send network topology
        deferred(self.router.nse.i_am_router_to_network)


        ## Setup each BACnet device
        for alias_each in self.alias:
            ...

            self.device[alias_each] = CustomBACnetDevice(
                objectName=self.bacnet_device_name[alias_each],
                objectIdentifier=('device', self.device_instance[alias_each]),
                maxApduLengthAccepted=int(1024),
                segmentationSupported="noSegmentation",
                vendorIdentifier=555,
                vendorName=CharacterString("Vendor"), 
                modelName=CharacterString("BACnet Bridge"),
                systemStatus=DeviceStatus(1),
                description=CharacterString("BACpypes (Python) based tool for exposing points"),
                firmwareRevision="0.0.0",
                applicationSoftwareVersion="0.0.0",
                protocolVersion=1,
                protocolRevision=0)

            self.vlan_address[alias_each] = Address(10 + self.index_site[alias_each])
            self.application[alias_each] = CustomBACnetApplication(vlan_device=self.device[alias_each],
                                                                    vlan_address=self.vlan_address[alias_each])
            self.vlan.add_node(self.application[alias_each].vlan_node)

            self.points[alias_each] = {}
            self.points_prev[alias_each] = {}
            self.points_new[alias_each] = {}

            for point in self.points[alias_each].values():
                self.application[alias_each].add_object(point)
...            

The problems I encountered

First, I got a warning saying - path error (1) per each device I created (for example, I got 5 warnings when I create 5 devices). I am not sure if this warning is the root cause of the following issue.

Second, I could discover the devices from my building automation system, but could not add the devices to the building automation system (Niagara N4). Also, I cannot see the list of points, cannot read/write the points. I could not read any information of the devices, except for the network number (6), MAC address (10 and 11) and object identifiers (shown as device names). I could read other information like vendor, model, firmware/app version, device name along with the other information

I tried changing the IP address from 0.0.0.0 to 0.0.0.0/24 or 0.0.0.0:47808, but did not work. I am new to BACnet and building automation system, so I think I might miss some simple things when I implemented it. I appreciate any comments and/or suggestions, and feel free to let me know if there is a better way/example I can refer to implement what I need.

@Kang-SungKu
Copy link
Author

So my issue is similar to the issue here #460, as I could discover the devices but their MAC addresses are shown as 1, 2, 3, or 4 instead of an actual IP address. So, I could achieve the goal based on the second solution proposed there: creating a single BACnet device/application and adding all the points from multiple building simulations to the application (in other words, the same implementation as I did for a single device in the initial post).

I think it will work well for 10+ or 100+ building simulations in my case (each building simulation has 100~200 points), but would like to know what would be the difference between this approach and the approach using a virtual router. For example, what is the advantage of using a virtual router to manage multiple devices rather than adding all the points to a single BACnet device/application, assuming we have 100-200 points per device (like virtual router can handle a large number of devices more smoothly)?

If using a virtual router has a significant advantage in terms of scalability, I might include it as a long-term TODO in case I need to use a large number of building simulations at the same time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant