diff --git a/lib/src/core/thing_discovery.dart b/lib/src/core/thing_discovery.dart index 28223b44..e0d74dfb 100644 --- a/lib/src/core/thing_discovery.dart +++ b/lib/src/core/thing_discovery.dart @@ -375,3 +375,72 @@ extension _FlatStreamExtension on Stream> { } } } + +/// Implemention of the [scripting_api.ThingDiscoveryProcess] interface. +class ThingDiscoveryProcess extends Stream + implements scripting_api.ThingDiscoveryProcess { + /// Constructs a new [ThingDiscoveryProcess]. + /// + /// Accepts a [_thingDescriptionStream], which is filtered by an optional + /// [thingFilter]. + ThingDiscoveryProcess( + this._thingDescriptionStream, + this.thingFilter, + ); + + StreamSubscription? _streamSubscription; + + final Stream _thingDescriptionStream; + + var _done = false; + + @override + bool get done => _done; + + Exception? _error; + + @override + Exception? get error => _error; + + @override + final scripting_api.ThingFilter? thingFilter; + + @override + StreamSubscription listen( + void Function(ThingDescription event)? onData, { + Function? onError, + void Function()? onDone, + bool? cancelOnError, + }) { + final streamSubscription = _thingDescriptionStream.listen( + onData, + onError: (error, stackTrace) { + if (error is Exception) { + _error = error; + // ignore: avoid_dynamic_calls + onError?.call(error, stackTrace); + } + }, + onDone: () { + _done = true; + onDone?.call(); + }, + cancelOnError: cancelOnError, + ); + + _streamSubscription = streamSubscription; + + return streamSubscription; + } + + @override + Future stop() async { + if (done) { + return; + } + + await _streamSubscription?.cancel(); + + _done = true; + } +} diff --git a/lib/src/core/wot.dart b/lib/src/core/wot.dart index 1bdf8fc0..967caaa0 100644 --- a/lib/src/core/wot.dart +++ b/lib/src/core/wot.dart @@ -4,13 +4,16 @@ // // SPDX-License-Identifier: BSD-3-Clause +import 'dart:async'; + import '../../scripting_api.dart' as scripting_api; import '../definitions/thing_description.dart'; import '../scripting_api/discovery/discovery_method.dart'; import 'consumed_thing.dart'; import 'exposed_thing.dart'; import 'servient.dart'; -import 'thing_discovery.dart' show ThingDiscovery; +import 'thing_discovery.dart' + show DiscoveryException, ThingDiscovery, ThingDiscoveryProcess; /// This [Exception] is thrown if an error during the consumption of a /// [ThingDescription] occurs. @@ -95,4 +98,93 @@ class WoT implements scripting_api.WoT { Future requestThingDescription(Uri url) { return _servient.requestThingDescription(url); } + + @override + Future exploreDirectory( + Uri url, [ + scripting_api.ThingFilter? filter, + ]) async { + final thingDescription = await requestThingDescription(url); + + if (!thingDescription.isValidDirectoryThingDescription) { + throw DiscoveryException( + 'Encountered an invalid Directory Thing Description', + ); + } + + final consumedDirectoryThing = await consume(thingDescription); + + final interactionOutput = + await consumedDirectoryThing.readProperty('things'); + final rawThingDescriptions = await interactionOutput.value(); + + if (rawThingDescriptions is! List) { + throw DiscoveryException( + 'Expected an array of Thing Descriptions but received an ' + 'invalid output instead.', + ); + } + + final thingDescriptionStream = Stream.fromIterable( + rawThingDescriptions.whereType>(), + ).toThingDescriptionStream(); + + return ThingDiscoveryProcess(thingDescriptionStream, filter); + } +} + +extension _DirectoryValidationExtension on ThingDescription { + bool get isValidDirectoryThingDescription { + final atTypes = atType; + + if (atTypes == null) { + return false; + } + + const discoveryContextUri = 'https://www.w3.org/2022/wot/discovery'; + const type = 'ThingDirectory'; + const fullIri = '$discoveryContextUri#$type'; + + if (atTypes.contains(fullIri)) { + return true; + } + + return context.contains((value: discoveryContextUri, key: null)) && + atTypes.contains(type); + } +} + +extension _DirectoryTdDeserializationExtension on Stream> { + Stream toThingDescriptionStream() { + const streamTransformer = StreamTransformer(_transformerMethod); + + return transform(streamTransformer); + } + + static StreamSubscription _transformerMethod( + Stream> rawThingDescriptionStream, + bool cancelOnError, + ) { + final streamController = StreamController(); + + final streamSubscription = rawThingDescriptionStream.listen( + (rawThingDescription) { + try { + streamController.add(ThingDescription.fromJson(rawThingDescription)); + } on Exception catch (exception) { + streamController.addError(exception); + } + }, + onDone: streamController.close, + onError: streamController.addError, + cancelOnError: cancelOnError, + ); + + streamController + ..onPause = streamSubscription.pause + ..onResume = streamSubscription.resume + ..onCancel = streamSubscription.cancel; + + return streamController.stream.listen(null); + } } diff --git a/lib/src/scripting_api/discovery/thing_discovery.dart b/lib/src/scripting_api/discovery/thing_discovery.dart index a50443d9..fd6ef55a 100644 --- a/lib/src/scripting_api/discovery/thing_discovery.dart +++ b/lib/src/scripting_api/discovery/thing_discovery.dart @@ -22,3 +22,23 @@ abstract interface class ThingDiscovery implements Stream { /// Stops the discovery process. void stop(); } + +/// Provides the properties and methods controlling the discovery process, and +/// returning the results. +abstract interface class ThingDiscoveryProcess + implements Stream { + /// Optional filter that can applied during the discovery process. + ThingFilter? get thingFilter; + + /// `true` if the discovery has been stopped or completed with no more results + /// to report. + bool get done; + + /// Represents the last error that occurred during the discovery process. + /// + /// Typically used for critical errors that stop discovery. + Exception? get error; + + /// Stops the discovery process. + Future stop(); +} diff --git a/lib/src/scripting_api/wot.dart b/lib/src/scripting_api/wot.dart index eb8b4c71..7ed03a01 100644 --- a/lib/src/scripting_api/wot.dart +++ b/lib/src/scripting_api/wot.dart @@ -34,6 +34,14 @@ abstract interface class WoT { /// Requests a [ThingDescription] from the given [url]. Future requestThingDescription(Uri url); + /// Starts the discovery process that given a TD Directory [url], will provide + /// [ThingDescription] objects for Thing Descriptions that match an optional + /// [filter] argument of type [ThingFilter]. + Future exploreDirectory( + Uri url, [ + ThingFilter? filter, + ]); + /// Discovers [ThingDescription]s from a given [url] using the specified /// [method]. ///