Skip to content

Commit

Permalink
feat: implement exploreDirectory method
Browse files Browse the repository at this point in the history
  • Loading branch information
JKRhb committed Dec 23, 2023
1 parent 836ab1c commit 3a9ef63
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 1 deletion.
69 changes: 69 additions & 0 deletions lib/src/core/thing_discovery.dart
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,72 @@ extension _FlatStreamExtension<T> on Stream<Stream<T>> {
}
}
}

/// Implemention of the [scripting_api.ThingDiscoveryProcess] interface.
class ThingDiscoveryProcess extends Stream<ThingDescription>
implements scripting_api.ThingDiscoveryProcess {
/// Constructs a new [ThingDiscoveryProcess].
///
/// Accepts a [_thingDescriptionStream], which is filtered by an optional
/// [thingFilter].
ThingDiscoveryProcess(
this._thingDescriptionStream,
this.thingFilter,
);

StreamSubscription<ThingDescription>? _streamSubscription;

final Stream<ThingDescription> _thingDescriptionStream;

var _done = false;

@override
bool get done => _done;

Exception? _error;

@override
Exception? get error => _error;

@override
final scripting_api.ThingFilter? thingFilter;

@override
StreamSubscription<ThingDescription> listen(
void Function(ThingDescription event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) {
final streamSubscription = _thingDescriptionStream.listen(
onData,
onError: (error, stackTrace) {
if (error is Exception) {
_error = error;
// ignore: avoid_dynamic_calls
onError?.call(error, stackTrace);
}
},
onDone: () {
_done = true;
onDone?.call();
},
cancelOnError: cancelOnError,
);

_streamSubscription = streamSubscription;

return streamSubscription;
}

@override
Future<void> stop() async {
if (done) {
return;
}

await _streamSubscription?.cancel();

_done = true;
}
}
94 changes: 93 additions & 1 deletion lib/src/core/wot.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
//
// SPDX-License-Identifier: BSD-3-Clause

import 'dart:async';

import '../../scripting_api.dart' as scripting_api;
import '../definitions/thing_description.dart';
import '../scripting_api/discovery/discovery_method.dart';
import 'consumed_thing.dart';
import 'exposed_thing.dart';
import 'servient.dart';
import 'thing_discovery.dart' show ThingDiscovery;
import 'thing_discovery.dart'
show DiscoveryException, ThingDiscovery, ThingDiscoveryProcess;

/// This [Exception] is thrown if an error during the consumption of a
/// [ThingDescription] occurs.
Expand Down Expand Up @@ -95,4 +98,93 @@ class WoT implements scripting_api.WoT {
Future<ThingDescription> requestThingDescription(Uri url) {
return _servient.requestThingDescription(url);
}

@override
Future<scripting_api.ThingDiscoveryProcess> exploreDirectory(
Uri url, [
scripting_api.ThingFilter? filter,
]) async {
final thingDescription = await requestThingDescription(url);

if (!thingDescription.isValidDirectoryThingDescription) {
throw DiscoveryException(
'Encountered an invalid Directory Thing Description',
);
}

final consumedDirectoryThing = await consume(thingDescription);

final interactionOutput =
await consumedDirectoryThing.readProperty('things');
final rawThingDescriptions = await interactionOutput.value();

if (rawThingDescriptions is! List<dynamic>) {
throw DiscoveryException(
'Expected an array of Thing Descriptions but received an '
'invalid output instead.',
);
}

final thingDescriptionStream = Stream.fromIterable(
rawThingDescriptions.whereType<Map<String, dynamic>>(),
).toThingDescriptionStream();

return ThingDiscoveryProcess(thingDescriptionStream, filter);
}
}

extension _DirectoryValidationExtension on ThingDescription {
bool get isValidDirectoryThingDescription {
final atTypes = atType;

if (atTypes == null) {
return false;
}

const discoveryContextUri = 'https://www.w3.org/2022/wot/discovery';
const type = 'ThingDirectory';
const fullIri = '$discoveryContextUri#$type';

if (atTypes.contains(fullIri)) {
return true;
}

return context.contains((value: discoveryContextUri, key: null)) &&
atTypes.contains(type);
}
}

extension _DirectoryTdDeserializationExtension on Stream<Map<String, dynamic>> {
Stream<ThingDescription> toThingDescriptionStream() {
const streamTransformer = StreamTransformer(_transformerMethod);

return transform(streamTransformer);
}

static StreamSubscription<ThingDescription> _transformerMethod(
Stream<Map<String, dynamic>> rawThingDescriptionStream,
bool cancelOnError,
) {
final streamController = StreamController<ThingDescription>();

final streamSubscription = rawThingDescriptionStream.listen(
(rawThingDescription) {
try {
streamController.add(ThingDescription.fromJson(rawThingDescription));
} on Exception catch (exception) {
streamController.addError(exception);
}
},
onDone: streamController.close,
onError: streamController.addError,
cancelOnError: cancelOnError,
);

streamController
..onPause = streamSubscription.pause
..onResume = streamSubscription.resume
..onCancel = streamSubscription.cancel;

return streamController.stream.listen(null);
}
}
20 changes: 20 additions & 0 deletions lib/src/scripting_api/discovery/thing_discovery.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,23 @@ abstract interface class ThingDiscovery implements Stream<ThingDescription> {
/// Stops the discovery process.
void stop();
}

/// Provides the properties and methods controlling the discovery process, and
/// returning the results.
abstract interface class ThingDiscoveryProcess
implements Stream<ThingDescription> {
/// Optional filter that can applied during the discovery process.
ThingFilter? get thingFilter;

/// `true` if the discovery has been stopped or completed with no more results
/// to report.
bool get done;

/// Represents the last error that occurred during the discovery process.
///
/// Typically used for critical errors that stop discovery.
Exception? get error;

/// Stops the discovery process.
Future<void> stop();
}
8 changes: 8 additions & 0 deletions lib/src/scripting_api/wot.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ abstract interface class WoT {
/// Requests a [ThingDescription] from the given [url].
Future<ThingDescription> requestThingDescription(Uri url);

/// Starts the discovery process that given a TD Directory [url], will provide
/// [ThingDescription] objects for Thing Descriptions that match an optional
/// [filter] argument of type [ThingFilter].
Future<ThingDiscoveryProcess> exploreDirectory(
Uri url, [
ThingFilter? filter,
]);

/// Discovers [ThingDescription]s from a given [url] using the specified
/// [method].
///
Expand Down

0 comments on commit 3a9ef63

Please sign in to comment.