diff --git a/include/zenoh-pico/net/filtering.h b/include/zenoh-pico/net/filtering.h index 14461a322..928bd47ea 100644 --- a/include/zenoh-pico/net/filtering.h +++ b/include/zenoh-pico/net/filtering.h @@ -43,11 +43,10 @@ typedef struct _z_write_filter_t { _z_writer_filter_ctx_t *ctx; } _z_write_filter_t; -typedef struct _z_publisher_t _z_publisher_t; - -z_result_t _z_write_filter_create(_z_publisher_t *pub); -z_result_t _z_write_filter_destroy(_z_publisher_t *pub); -bool _z_write_filter_active(const _z_publisher_t *pub); +z_result_t _z_write_filter_create(_z_session_t *zn, _z_write_filter_t *filter, _z_keyexpr_t keyexpr, + uint8_t interest_flag); +z_result_t _z_write_filter_destroy(_z_session_t *zn, _z_write_filter_t *filter); +bool _z_write_filter_active(const _z_write_filter_t *filter); #ifdef __cplusplus } diff --git a/include/zenoh-pico/net/query.h b/include/zenoh-pico/net/query.h index 489dd2411..9c6844b1f 100644 --- a/include/zenoh-pico/net/query.h +++ b/include/zenoh-pico/net/query.h @@ -18,6 +18,7 @@ #include "zenoh-pico/api/constants.h" #include "zenoh-pico/collections/bytes.h" +#include "zenoh-pico/net/filtering.h" #include "zenoh-pico/net/session.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/keyexpr.h" @@ -66,6 +67,9 @@ typedef struct _z_querier_t { z_reliability_t reliability; bool _is_express; uint64_t _timeout_ms; +#if Z_FEATURE_INTEREST == 1 + _z_write_filter_t _filter; +#endif } _z_querier_t; #if Z_FEATURE_QUERY == 1 diff --git a/src/api/api.c b/src/api/api.c index 3c9bacefe..3a9083c3e 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -33,6 +33,7 @@ #include "zenoh-pico/net/sample.h" #include "zenoh-pico/net/session.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/interest.h" #include "zenoh-pico/protocol/keyexpr.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" @@ -971,7 +972,8 @@ z_result_t z_declare_publisher(const z_loaned_session_t *zs, z_owned_publisher_t _z_publisher_t int_pub = _z_declare_publisher(zs, key, opt.encoding == NULL ? NULL : &opt.encoding->_this._val, opt.congestion_control, opt.priority, opt.is_express, reliability); // Create write filter - z_result_t res = _z_write_filter_create(&int_pub); + z_result_t res = + _z_write_filter_create(_Z_RC_IN_VAL(zs), &int_pub._filter, keyexpr_aliased, _Z_INTEREST_FLAG_SUBSCRIBERS); if (res != _Z_RES_OK) { if (key._id != Z_RESOURCE_ID_NONE) { _z_undeclare_resource(_Z_RC_IN_VAL(zs), key._id); @@ -1037,7 +1039,7 @@ z_result_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *pay _z_bytes_t attachment_bytes = _z_bytes_from_owned_bytes(&opt.attachment->_this); // Check if write filter is active before writing - if (!_z_write_filter_active(pub)) { + if (!_z_write_filter_active(&pub->_filter)) { // Write value ret = _z_write(session, pub_keyexpr, payload_bytes, &encoding, Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, attachment_bytes, reliability); @@ -1255,6 +1257,15 @@ z_result_t z_declare_querier(const z_loaned_session_t *zs, z_owned_querier_t *qu _z_querier_t int_querier = _z_declare_querier(zs, key, opt.consolidation.mode, opt.congestion_control, opt.target, opt.priority, opt.is_express, opt.timeout_ms); + // Create write filter + z_result_t res = + _z_write_filter_create(_Z_RC_IN_VAL(zs), &int_querier._filter, keyexpr_aliased, _Z_INTEREST_FLAG_QUERYABLES); + if (res != _Z_RES_OK) { + if (key._id != Z_RESOURCE_ID_NONE) { + _z_undeclare_resource(_Z_RC_IN_VAL(zs), key._id); + } + return res; + } querier->_val = int_querier; return _Z_RES_OK; } @@ -1311,13 +1322,17 @@ z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *paramete } if (session != NULL) { - _z_value_t value = {.payload = _z_bytes_from_owned_bytes(&opt.payload->_this), - .encoding = _z_encoding_from_owned(&opt.encoding->_this)}; + if (_z_write_filter_active(&querier->_filter)) { + callback->_this._val.drop(ctx); + } else { + _z_value_t value = {.payload = _z_bytes_from_owned_bytes(&opt.payload->_this), + .encoding = _z_encoding_from_owned(&opt.encoding->_this)}; - ret = _z_query(session, querier_keyexpr, parameters, querier->_target, consolidation_mode, value, - callback->_this._val.call, callback->_this._val.drop, ctx, querier->_timeout_ms, - _z_bytes_from_owned_bytes(&opt.attachment->_this), querier->_congestion_control, - querier->_priority, querier->_is_express); + ret = _z_query(session, querier_keyexpr, parameters, querier->_target, consolidation_mode, value, + callback->_this._val.call, callback->_this._val.drop, ctx, querier->_timeout_ms, + _z_bytes_from_owned_bytes(&opt.attachment->_this), querier->_congestion_control, + querier->_priority, querier->_is_express); + } } else { ret = _Z_ERR_SESSION_CLOSED; } diff --git a/src/net/filtering.c b/src/net/filtering.c index 022c6c798..42a3c039f 100644 --- a/src/net/filtering.c +++ b/src/net/filtering.c @@ -18,18 +18,9 @@ #include #include -#include "zenoh-pico/api/types.h" #include "zenoh-pico/config.h" #include "zenoh-pico/net/primitives.h" -#include "zenoh-pico/net/query.h" -#include "zenoh-pico/protocol/codec/core.h" -#include "zenoh-pico/protocol/core.h" -#include "zenoh-pico/protocol/definitions/network.h" -#include "zenoh-pico/protocol/keyexpr.h" -#include "zenoh-pico/session/queryable.h" -#include "zenoh-pico/session/resource.h" -#include "zenoh-pico/session/utils.h" -#include "zenoh-pico/utils/logging.h" +#include "zenoh-pico/session/session.h" #if Z_FEATURE_INTEREST == 1 static void _z_write_filter_callback(const _z_interest_msg_t *msg, void *arg) { @@ -44,6 +35,7 @@ static void _z_write_filter_callback(const _z_interest_msg_t *msg, void *arg) { break; case _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER: + case _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE: ctx->state = WRITE_FILTER_OFF; ctx->decl_id = msg->id; break; @@ -54,14 +46,16 @@ static void _z_write_filter_callback(const _z_interest_msg_t *msg, void *arg) { break; // Remove filter if we receive a subscribe case WRITE_FILTER_ACTIVE: - if (msg->type == _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER) { + if (msg->type == _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER || msg->type == _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE) { ctx->state = WRITE_FILTER_OFF; ctx->decl_id = msg->id; } break; // Activate filter if subscribe is removed case WRITE_FILTER_OFF: - if ((msg->type == _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER) && (ctx->decl_id == msg->id)) { + if ((msg->type == _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER || + msg->type == _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE) && + (ctx->decl_id == msg->id)) { ctx->state = WRITE_FILTER_ACTIVE; ctx->decl_id = 0; } @@ -72,9 +66,10 @@ static void _z_write_filter_callback(const _z_interest_msg_t *msg, void *arg) { } } -z_result_t _z_write_filter_create(_z_publisher_t *pub) { - uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED | - _Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE; +z_result_t _z_write_filter_create(_z_session_t *zn, _z_write_filter_t *filter, _z_keyexpr_t keyexpr, + uint8_t interest_flag) { + uint8_t flags = interest_flag | _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_CURRENT | + _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE; _z_writer_filter_ctx_t *ctx = (_z_writer_filter_ctx_t *)z_malloc(sizeof(_z_writer_filter_ctx_t)); if (ctx == NULL) { @@ -83,44 +78,47 @@ z_result_t _z_write_filter_create(_z_publisher_t *pub) { ctx->state = WRITE_FILTER_INIT; ctx->decl_id = 0; - pub->_filter.ctx = ctx; - pub->_filter._interest_id = - _z_add_interest(_Z_RC_IN_VAL(&pub->_zn), _z_keyexpr_alias_from_user_defined(pub->_key, true), - _z_write_filter_callback, flags, (void *)ctx); - if (pub->_filter._interest_id == 0) { + filter->ctx = ctx; + filter->_interest_id = _z_add_interest(zn, keyexpr, _z_write_filter_callback, flags, (void *)ctx); + if (filter->_interest_id == 0) { z_free(ctx); return _Z_ERR_GENERIC; } return _Z_RES_OK; } -z_result_t _z_write_filter_destroy(_z_publisher_t *pub) { - if (pub->_filter.ctx != NULL) { - z_result_t res = _z_remove_interest(_Z_RC_IN_VAL(&pub->_zn), pub->_filter._interest_id); - z_free(pub->_filter.ctx); - pub->_filter.ctx = NULL; +z_result_t _z_write_filter_destroy(_z_session_t *zn, _z_write_filter_t *filter) { + if (filter->ctx != NULL) { + z_result_t res = _z_remove_interest(zn, filter->_interest_id); + z_free(filter->ctx); + filter->ctx = NULL; return res; } return _Z_RES_OK; } -bool _z_write_filter_active(const _z_publisher_t *pub) { - return pub->_filter.ctx != NULL && pub->_filter.ctx->state == WRITE_FILTER_ACTIVE; +bool _z_write_filter_active(const _z_write_filter_t *filter) { + return filter->ctx != NULL && filter->ctx->state == WRITE_FILTER_ACTIVE; } #else -z_result_t _z_write_filter_create(_z_publisher_t *pub) { - _ZP_UNUSED(pub); +z_result_t _z_write_filter_create(_z_session_t *zn, _z_write_filter_t *filter, _z_keyexpr_t keyexpr, + uint8_t interest_flag) { + _ZP_UNUSED(zn); + _ZP_UNUSED(keyexpr); + _ZP_UNUSED(filter); + _ZP_UNUSED(interest_flag); return _Z_RES_OK; } -z_result_t _z_write_filter_destroy(_z_publisher_t *pub) { - _ZP_UNUSED(pub); +z_result_t _z_write_filter_destroy(_z_session_t *zn, _z_write_filter_t *filter) { + _ZP_UNUSED(zn); + _ZP_UNUSED(filter); return _Z_RES_OK; } -bool _z_write_filter_active(const _z_publisher_t *pub) { - _ZP_UNUSED(pub); +bool _z_write_filter_active(const _z_write_filter_t *filter) { + _ZP_UNUSED(filter); return false; } diff --git a/src/net/primitives.c b/src/net/primitives.c index a93915189..191999ed4 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -185,7 +185,7 @@ z_result_t _z_undeclare_publisher(_z_publisher_t *pub) { _z_matching_listener_entity_undeclare(_Z_RC_IN_VAL(&pub->_zn), pub->_id); #endif // Clear publisher - _z_write_filter_destroy(pub); + _z_write_filter_destroy(_Z_RC_IN_VAL(&pub->_zn), &pub->_filter); _z_undeclare_resource(_Z_RC_IN_VAL(&pub->_zn), pub->_key._id); return _Z_RES_OK; } @@ -541,6 +541,7 @@ z_result_t _z_undeclare_querier(_z_querier_t *querier) { if (querier == NULL || _Z_RC_IS_NULL(&querier->_zn)) { return _Z_ERR_ENTITY_UNKNOWN; } + _z_write_filter_destroy(_Z_RC_IN_VAL(&querier->_zn), &querier->_filter); _z_undeclare_resource(_Z_RC_IN_VAL(&querier->_zn), querier->_key._id); return _Z_RES_OK; }