Skip to content

Commit

Permalink
Add querier matching support
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 29, 2025
1 parent acaed6e commit 293d58b
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 50 deletions.
3 changes: 3 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,9 @@ Functions
.. autocfunction:: primitives.h::z_undeclare_querier
.. autocfunction:: primitives.h::z_querier_get
.. autocfunction:: primitives.h::z_querier_keyexpr
.. autocfunction:: primitives.h::z_querier_get_matching_status
.. autocfunction:: primitives.h::z_querier_declare_matching_listener
.. autocfunction:: primitives.h::z_querier_declare_background_matching_listener
.. autocfunction:: primitives.h::z_querier_options_default
.. autocfunction:: primitives.h::z_querier_get_options_default
Expand Down
28 changes: 27 additions & 1 deletion examples/unix/c11/z_querier.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@

#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1 && defined Z_FEATURE_UNSTABLE_API

#if Z_FEATURE_MATCHING == 1
void matching_status_handler(const z_matching_status_t *matching_status, void *arg) {
(void)arg;
if (matching_status->matching) {
printf("Querier has matching queryable.\n");
} else {
printf("Querier has NO MORE matching queryables.\n");
}
}
#endif

int main(int argc, char **argv) {
const char *selector = "demo/example/**";
const char *mode = "client";
Expand All @@ -27,9 +38,10 @@ int main(int argc, char **argv) {
const char *value = NULL;
int n = INT_MAX;
int timeout_ms = 0;
bool add_matching_listener = false;

int opt;
while ((opt = getopt(argc, argv, "s:e:m:v:l:n:t:")) != -1) {
while ((opt = getopt(argc, argv, "s:e:m:v:l:n:t:a")) != -1) {
switch (opt) {
case 's':
selector = optarg;
Expand All @@ -52,6 +64,9 @@ int main(int argc, char **argv) {
case 't':
timeout_ms = atoi(optarg);
break;
case 'a':
add_matching_listener = true;
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' ||
optopt == 'n' || optopt == 't') {
Expand Down Expand Up @@ -115,6 +130,17 @@ int main(int argc, char **argv) {
exit(-1);
}

if (add_matching_listener) {
#if Z_FEATURE_MATCHING == 1
z_owned_closure_matching_status_t callback;
z_closure(&callback, matching_status_handler, NULL, NULL);
z_querier_declare_background_matching_listener(z_loan(querier), z_move(callback));
#else
printf("ERROR: Zenoh pico was compiled without Z_FEATURE_MATCHING but this example requires it.\n");
return -2;
#endif
}

printf("Press CTRL-C to quit...\n");
char buf[256];
for (int idx = 0; idx != n; ++idx) {
Expand Down
49 changes: 49 additions & 0 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,55 @@ z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *paramete
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
const z_loaned_keyexpr_t *z_querier_keyexpr(const z_loaned_querier_t *querier);

#if Z_FEATURE_MATCHING == 1
/**
* Declares a matching listener, registering a callback for notifying queryables matching the given querier key
* expression and target. The callback will be run in the background until the corresponding querier is dropped.
*
* Parameters:
* querier: A querier to associate with matching listener.
* callback: A closure that will be called every time the matching status of the querier changes (If last
* queryable disconnects or when the first queryable connects).
*
* Return:
* ``0`` if put operation is successful, ``negative value`` otherwise.
*
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
z_result_t z_querier_declare_background_matching_listener(const z_loaned_querier_t *querier,
z_moved_closure_matching_status_t *callback);
/**
* Constructs matching listener, registering a callback for notifying queryables matching with a given querier's
* key expression and target.
*
* Parameters:
* querier: A querier to associate with matching listener.
* matching_listener: An uninitialized memory location where matching listener will be constructed. The matching
* listener's callback will be automatically dropped when the querier is dropped.
* callback: A closure that will be called every time the matching status of the querier changes (If last
* queryable disconnects or when the first queryable connects).
*
* Return:
* ``0`` if put operation is successful, ``negative value`` otherwise.
*
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
z_result_t z_querier_declare_matching_listener(const z_loaned_querier_t *querier,
z_owned_matching_listener_t *matching_listener,
z_moved_closure_matching_status_t *callback);
/**
* Gets querier matching status - i.e. if there are any queryables matching its key expression and target.
*
* Return:
* ``0`` if put operation is successful, ``negative value`` otherwise.
*
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
z_result_t z_querier_get_matching_status(const z_loaned_querier_t *querier, z_matching_status_t *matching_status);

#endif // Z_FEATURE_MATCHING == 1

#endif // Z_FEATURE_UNSTABLE_API

/**
Expand Down
6 changes: 3 additions & 3 deletions include/zenoh-pico/net/matching.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ typedef struct _z_matching_listener_t {

#if Z_FEATURE_MATCHING == 1
_z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _z_keyexpr_t *key, _z_zint_t entity_id,
_z_closure_matching_status_t callback);
uint8_t interest_type_flag, _z_closure_matching_status_t callback);
z_result_t _z_matching_listener_entity_undeclare(_z_session_t *zn, _z_zint_t entity_id);
z_result_t _z_matching_listener_undeclare(_z_matching_listener_t *listener);
// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_matching_listener_t _z_matching_listener_null(void) { return (_z_matching_listener_t){0}; }
static inline bool _z_matching_listener_check(const _z_matching_listener_t *matching_listener) {
return !_Z_RC_IS_NULL(&matching_listener->_zn);
}
void _z_matching_listener_clear(_z_matching_listener_t *pub);
void _z_matching_listener_free(_z_matching_listener_t **pub);
void _z_matching_listener_clear(_z_matching_listener_t *listener);
void _z_matching_listener_free(_z_matching_listener_t **listener);
#endif // Z_FEATURE_MATCHING == 1

#ifdef __cplusplus
Expand Down
3 changes: 3 additions & 0 deletions include/zenoh-pico/session/matching.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ typedef struct {
} _z_closure_matching_status_t;

#if Z_FEATURE_MATCHING == 1

#define _Z_MATCHING_LISTENER_CTX_NULL_ID 0xFFFFFFFF

typedef struct _z_matching_listener_ctx_t {
uint32_t decl_id;
_z_closure_matching_status_t callback;
Expand Down
37 changes: 33 additions & 4 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1122,8 +1122,8 @@ z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *pub
z_owned_matching_listener_t *matching_listener,
z_moved_closure_matching_status_t *callback) {
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&publisher->_zn);
_z_matching_listener_t listener =
_z_matching_listener_declare(&sess_rc, &publisher->_key, publisher->_id, callback->_this._val);
_z_matching_listener_t listener = _z_matching_listener_declare(&sess_rc, &publisher->_key, publisher->_id,
_Z_INTEREST_FLAG_SUBSCRIBERS, callback->_this._val);
_z_session_rc_drop(&sess_rc);

z_internal_closure_matching_status_null(&callback->_this);
Expand All @@ -1135,8 +1135,6 @@ z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *pub

z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher,
z_matching_status_t *matching_status) {
// Ideally this should be implemented as a real request to the router, but this works much faster.
// And it works as long as filtering is enabled along with interest
matching_status->matching = publisher->_filter.ctx->state != WRITE_FILTER_ACTIVE;
return _Z_RES_OK;
}
Expand Down Expand Up @@ -1353,6 +1351,37 @@ z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *paramete
const z_loaned_keyexpr_t *z_querier_keyexpr(const z_loaned_querier_t *querier) {
return (const z_loaned_keyexpr_t *)&querier->_key;
}

#if Z_FEATURE_MATCHING == 1
z_result_t z_querier_declare_background_matching_listener(const z_loaned_querier_t *querier,
z_moved_closure_matching_status_t *callback) {
z_owned_matching_listener_t listener;
_Z_RETURN_IF_ERR(z_querier_declare_matching_listener(querier, &listener, callback));
_z_matching_listener_clear(&listener._val);
return _Z_RES_OK;
}
z_result_t z_querier_declare_matching_listener(const z_loaned_querier_t *querier,
z_owned_matching_listener_t *matching_listener,
z_moved_closure_matching_status_t *callback) {
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&querier->_zn);
_z_matching_listener_t listener = _z_matching_listener_declare(&sess_rc, &querier->_key, querier->_id,
_Z_INTEREST_FLAG_QUERYABLES, callback->_this._val);
_z_session_rc_drop(&sess_rc);

z_internal_closure_matching_status_null(&callback->_this);

matching_listener->_val = listener;

return _z_matching_listener_check(&listener) ? _Z_RES_OK : _Z_ERR_GENERIC;
}

z_result_t z_querier_get_matching_status(const z_loaned_querier_t *querier, z_matching_status_t *matching_status) {
matching_status->matching = querier->_filter.ctx->state != WRITE_FILTER_ACTIVE;
return _Z_RES_OK;
}

#endif // Z_FEATURE_MATCHING == 1

#endif // Z_FEATURE_UNSTABLE_API

bool z_reply_is_ok(const z_loaned_reply_t *reply) { return reply->data._tag != _Z_REPLY_TAG_ERROR; }
Expand Down
24 changes: 14 additions & 10 deletions src/net/matching.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "zenoh-pico/api/types.h"
#include "zenoh-pico/net/primitives.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/definitions/interest.h"
#include "zenoh-pico/session/matching.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/utils/logging.h"
Expand All @@ -28,18 +29,21 @@
#if Z_FEATURE_MATCHING == 1
static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *arg) {
_z_matching_listener_ctx_t *ctx = (_z_matching_listener_ctx_t *)arg;

switch (msg->type) {
case _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER: {
ctx->decl_id = msg->id;
z_matching_status_t status = {.matching = true};
z_closure_matching_status_call(&ctx->callback, &status);
case _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER:
case _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE: {
if (ctx->decl_id == _Z_MATCHING_LISTENER_CTX_NULL_ID) {
ctx->decl_id = msg->id;
z_matching_status_t status = {.matching = true};
z_closure_matching_status_call(&ctx->callback, &status);
}
break;
}

case _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER: {
case _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER:
case _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE: {
if (ctx->decl_id == msg->id) {
ctx->decl_id = 0;
ctx->decl_id = _Z_MATCHING_LISTENER_CTX_NULL_ID;
z_matching_status_t status = {.matching = false};
z_closure_matching_status_call(&ctx->callback, &status);
}
Expand All @@ -52,9 +56,9 @@ static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *ar
}

_z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _z_keyexpr_t *key, _z_zint_t entity_id,
_z_closure_matching_status_t callback) {
uint8_t flags = _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_FUTURE |
_Z_INTEREST_FLAG_AGGREGATE;
uint8_t interest_type_flag, _z_closure_matching_status_t callback) {
uint8_t flags = interest_type_flag | _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_FUTURE |
_Z_INTEREST_FLAG_AGGREGATE | _Z_INTEREST_FLAG_CURRENT;
_z_matching_listener_t ret = _z_matching_listener_null();

_z_matching_listener_ctx_t *ctx = _z_matching_listener_ctx_new(callback);
Expand Down
3 changes: 3 additions & 0 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,9 @@ z_result_t _z_undeclare_querier(_z_querier_t *querier) {
if (querier == NULL || _Z_RC_IS_NULL(&querier->_zn)) {
return _Z_ERR_ENTITY_UNKNOWN;
}
#if Z_FEATURE_MATCHING == 1
_z_matching_listener_entity_undeclare(_Z_RC_IN_VAL(&querier->_zn), querier->_id);
#endif
_z_write_filter_destroy(_Z_RC_IN_VAL(&querier->_zn), &querier->_filter);
_z_undeclare_resource(_Z_RC_IN_VAL(&querier->_zn), querier->_key._id);
return _Z_RES_OK;
Expand Down
2 changes: 1 addition & 1 deletion src/session/matching.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
_z_matching_listener_ctx_t *_z_matching_listener_ctx_new(_z_closure_matching_status_t callback) {
_z_matching_listener_ctx_t *ctx = z_malloc(sizeof(_z_matching_listener_ctx_t));

ctx->decl_id = 0;
ctx->decl_id = _Z_MATCHING_LISTENER_CTX_NULL_ID;
ctx->callback = callback;

return ctx;
Expand Down
Loading

0 comments on commit 293d58b

Please sign in to comment.