From a6e1278dd0437b6fd613f724a1604e92fc9d2265 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 11 Apr 2024 16:06:18 +0200 Subject: [PATCH] Add query and reply channels --- examples/CMakeLists.txt | 2 + examples/unix/c11/z_get_channel.c | 129 ++++++++++++++++++++++ examples/unix/c11/z_pull.c | 2 +- examples/unix/c11/z_queryable_channel.c | 130 +++++++++++++++++++++++ include/zenoh-pico/api/handlers.h | 49 +++++++-- include/zenoh-pico/api/macros.h | 36 +++++-- include/zenoh-pico/api/primitives.h | 28 ++++- include/zenoh-pico/api/types.h | 25 ++++- include/zenoh-pico/collections/element.h | 2 +- include/zenoh-pico/session/session.h | 2 + src/api/api.c | 21 ++++ src/api/handlers.c | 39 ++++++- src/net/memory.c | 5 + src/session/query.c | 5 + tests/z_channels_test.c | 2 + 15 files changed, 454 insertions(+), 23 deletions(-) create mode 100644 examples/unix/c11/z_get_channel.c create mode 100644 examples/unix/c11/z_queryable_channel.c diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 1924dad9e..7650b6cfa 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -41,7 +41,9 @@ if(UNIX) add_example(z_sub_st unix/c11/z_sub_st.c) add_example(z_pull unix/c11/z_pull.c) add_example(z_get unix/c11/z_get.c) + add_example(z_get_channel unix/c11/z_get_channel.c) add_example(z_queryable unix/c11/z_queryable.c) + add_example(z_queryable_channel unix/c11/z_queryable_channel.c) add_example(z_info unix/c11/z_info.c) add_example(z_scout unix/c11/z_scout.c) add_example(z_ping unix/c11/z_ping.c) diff --git a/examples/unix/c11/z_get_channel.c b/examples/unix/c11/z_get_channel.c new file mode 100644 index 000000000..d40ca4379 --- /dev/null +++ b/examples/unix/c11/z_get_channel.c @@ -0,0 +1,129 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include +#include + +#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1 + +int main(int argc, char **argv) { + const char *keyexpr = "demo/example/**"; + const char *mode = "client"; + const char *clocator = NULL; + const char *llocator = NULL; + const char *value = NULL; + + int opt; + while ((opt = getopt(argc, argv, "k:e:m:v:l:")) != -1) { + switch (opt) { + case 'k': + keyexpr = optarg; + break; + case 'e': + clocator = optarg; + break; + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case 'v': + value = optarg; + break; + case '?': + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (clocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator)); + } + if (llocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_close(z_session_move(&s)); + return -1; + } + + z_keyexpr_t ke = z_keyexpr(keyexpr); + if (!z_check(ke)) { + printf("%s is not a valid key expression", keyexpr); + return -1; + } + + printf("Sending Query '%s'...\n", keyexpr); + z_get_options_t opts = z_get_options_default(); + if (value != NULL) { + opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value)); + } + z_owned_reply_ring_channel_t channel = z_reply_ring_channel_new(1); + if (z_get(z_loan(s), ke, "", z_move(channel.send), &opts) < 0) { + printf("Unable to send query.\n"); + return -1; + } + + z_owned_reply_t reply = z_reply_null(); + for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); + z_drop(z_move(keystr)); + } else { + printf(">> Received an error\n"); + } + } + + z_drop(z_move(channel)); + + // Stop read and lease tasks for zenoh-pico + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + + z_close(z_move(s)); + + return 0; +} +#else +int main(void) { + printf( + "ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires " + "them.\n"); + return -2; +} +#endif diff --git a/examples/unix/c11/z_pull.c b/examples/unix/c11/z_pull.c index c6bb58f20..6bc0c824d 100644 --- a/examples/unix/c11/z_pull.c +++ b/examples/unix/c11/z_pull.c @@ -82,7 +82,7 @@ int main(int argc, char **argv) { printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size); z_owned_sample_t sample = z_sample_null(); while (true) { - for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { + for (z_call(channel.try_recv, &sample); z_check(sample); z_call(channel.try_recv, &sample)) { z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample).keyexpr); printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)z_loan(sample).payload.len, z_loan(sample).payload.start); diff --git a/examples/unix/c11/z_queryable_channel.c b/examples/unix/c11/z_queryable_channel.c new file mode 100644 index 000000000..cc0637d70 --- /dev/null +++ b/examples/unix/c11/z_queryable_channel.c @@ -0,0 +1,130 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include +#include + +#if Z_FEATURE_QUERYABLE == 1 +const char *keyexpr = "demo/example/zenoh-pico-queryable"; +const char *value = "Queryable from Pico!"; + +int main(int argc, char **argv) { + const char *mode = "client"; + char *clocator = NULL; + char *llocator = NULL; + + int opt; + while ((opt = getopt(argc, argv, "k:e:m:v:l:")) != -1) { + switch (opt) { + case 'k': + keyexpr = optarg; + break; + case 'e': + clocator = optarg; + break; + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case 'v': + value = optarg; + break; + case '?': + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (clocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator)); + } + if (llocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_close(z_session_move(&s)); + return -1; + } + + z_keyexpr_t ke = z_keyexpr(keyexpr); + if (!z_check(ke)) { + printf("%s is not a valid key expression", keyexpr); + return -1; + } + + printf("Creating Queryable on '%s'...\n", keyexpr); + z_owned_query_ring_channel_t channel = z_query_ring_channel_new(10); + z_owned_queryable_t qable = z_declare_queryable(z_loan(s), ke, z_move(channel.send), NULL); + if (!z_check(qable)) { + printf("Unable to create queryable.\n"); + return -1; + } + + z_owned_query_t query = z_query_null(); + for (z_call(channel.recv, &query); z_check(query); z_call(channel.recv, &query)) { + z_query_t q = z_loan(query); + z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(&q)); + z_bytes_t pred = z_query_parameters(&q); + z_value_t payload_value = z_query_value(&q); + printf(" >> [Queryable handler] Received Query '%s?%.*s'\n", z_loan(keystr), (int)pred.len, pred.start); + if (payload_value.payload.len > 0) { + printf(" with value '%.*s'\n", (int)payload_value.payload.len, payload_value.payload.start); + } + z_query_reply_options_t options = z_query_reply_options_default(); + options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); + z_query_reply(&q, z_keyexpr(keyexpr), (const unsigned char *)value, strlen(value), &options); + z_drop(z_move(keystr)); + z_drop(z_move(query)); + } + + z_drop(z_move(channel)); + z_undeclare_queryable(z_move(qable)); + + // Stop read and lease tasks for zenoh-pico + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + + z_close(z_move(s)); + + return 0; +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERYABLE but this example requires it.\n"); + return -2; +} +#endif diff --git a/include/zenoh-pico/api/handlers.h b/include/zenoh-pico/api/handlers.h index 604031d53..7d6d142fa 100644 --- a/include/zenoh-pico/api/handlers.h +++ b/include/zenoh-pico/api/handlers.h @@ -24,13 +24,21 @@ #include "zenoh-pico/utils/logging.h" // -- Samples handler -void _z_owned_sample_move(z_owned_sample_t *dst, const z_owned_sample_t *src); +void _z_owned_sample_move(z_owned_sample_t *dst, z_owned_sample_t *src); z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src); +// -- Queries handler +void _z_owned_query_move(z_owned_query_t *dst, z_owned_query_t *src); +z_owned_query_t *_z_query_to_owned_ptr(const z_query_t *src); + +// -- Reply handler +void _z_owned_reply_move(z_owned_reply_t *dst, z_owned_reply_t *src); +z_owned_reply_t *_z_reply_clone(const z_owned_reply_t *src); + // -- Channel #define _Z_CHANNEL_DEFINE(name, send_closure_name, recv_closure_name, send_type, recv_type, collection_type, \ collection_new_f, collection_free_f, collection_push_f, collection_pull_f, \ - collection_try_pull_f, elem_move_f, elem_convert_f, elem_free_f) \ + collection_try_pull_f, elem_move_f, elem_convert_f, elem_drop_f) \ typedef struct { \ z_owned_##send_closure_name##_t send; \ z_owned_##recv_closure_name##_t recv; \ @@ -39,13 +47,14 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src); } z_owned_##name##_t; \ \ static inline void _z_##name##_elem_free(void **elem) { \ - elem_free_f((recv_type *)*elem); \ + elem_drop_f((recv_type *)*elem); \ + zp_free(*elem); \ *elem = NULL; \ } \ - static inline void _z_##name##_elem_move(void *dst, const void *src) { \ - elem_move_f((recv_type *)dst, (const recv_type *)src); \ + static inline void _z_##name##_elem_move(void *dst, void *src) { \ + elem_move_f((recv_type *)dst, (recv_type *)src); \ } \ - static inline void _z_##name##_send(const send_type *elem, void *context) { \ + static inline void _z_##name##_send(send_type *elem, void *context) { \ void *internal_elem = elem_convert_f(elem); \ if (internal_elem == NULL) { \ return; \ @@ -84,13 +93,33 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src); } // z_owned_sample_ring_channel_t -_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_ring_mt_t, - _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull, +_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, const z_sample_t, z_owned_sample_t, + _z_ring_mt_t, _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull, _z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop) // z_owned_sample_fifo_channel_t -_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_fifo_mt_t, - _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull, +_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, const z_sample_t, z_owned_sample_t, + _z_fifo_mt_t, _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull, _z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop) +// z_owned_query_ring_channel_t +_Z_CHANNEL_DEFINE(query_ring_channel, closure_query, closure_owned_query, const z_query_t, z_owned_query_t, + _z_ring_mt_t, _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull, + _z_owned_query_move, _z_query_to_owned_ptr, z_query_drop) + +// z_owned_query_fifo_channel_t +_Z_CHANNEL_DEFINE(query_fifo_channel, closure_query, closure_owned_query, const z_query_t, z_owned_query_t, + _z_fifo_mt_t, _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull, + _z_owned_query_move, _z_query_to_owned_ptr, z_query_drop) + +// z_owned_reply_ring_channel_t +_Z_CHANNEL_DEFINE(reply_ring_channel, closure_reply, closure_reply, z_owned_reply_t, z_owned_reply_t, _z_ring_mt_t, + _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull, + _z_owned_reply_move, _z_reply_clone, z_reply_drop) + +// z_owned_reply_fifo_channel_t +_Z_CHANNEL_DEFINE(reply_fifo_channel, closure_reply, closure_reply, z_owned_reply_t, z_owned_reply_t, _z_fifo_mt_t, + _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull, + _z_owned_reply_move, _z_reply_clone, z_reply_drop) + #endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H diff --git a/include/zenoh-pico/api/macros.h b/include/zenoh-pico/api/macros.h index e64f1f84a..5cbadcb54 100644 --- a/include/zenoh-pico/api/macros.h +++ b/include/zenoh-pico/api/macros.h @@ -43,7 +43,8 @@ z_owned_hello_t : z_hello_loan, \ z_owned_str_t : z_str_loan, \ z_owned_str_array_t : z_str_array_loan, \ - z_owned_sample_t : z_sample_loan \ + z_owned_sample_t : z_sample_loan, \ + z_owned_query_t : z_query_loan \ )(&x) /** * Defines a generic function for dropping any of the ``z_owned_X_t`` types. @@ -64,6 +65,7 @@ z_owned_str_t * : z_str_drop, \ z_owned_str_array_t * : z_str_array_drop, \ z_owned_sample_t * : z_sample_drop, \ + z_owned_query_t * : z_query_drop, \ z_owned_closure_sample_t * : z_closure_sample_drop, \ z_owned_closure_owned_sample_t * : z_closure_owned_sample_drop, \ z_owned_closure_query_t * : z_closure_query_drop, \ @@ -71,7 +73,11 @@ z_owned_closure_hello_t * : z_closure_hello_drop, \ z_owned_closure_zid_t * : z_closure_zid_drop, \ z_owned_sample_ring_channel_t * : z_sample_ring_channel_drop, \ - z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_drop \ + z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_drop, \ + z_owned_query_ring_channel_t * : z_query_ring_channel_drop, \ + z_owned_query_fifo_channel_t * : z_query_fifo_channel_drop, \ + z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \ + z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop \ )(x) /** @@ -99,7 +105,8 @@ z_owned_str_t : z_str_check, \ z_owned_str_array_t : z_str_array_check, \ z_bytes_t : z_bytes_check, \ - z_owned_sample_t : z_sample_check \ + z_owned_sample_t : z_sample_check, \ + z_owned_query_t : z_query_check \ )(&x) /** @@ -114,7 +121,8 @@ z_owned_closure_reply_t : z_closure_reply_call, \ z_owned_closure_hello_t : z_closure_hello_call, \ z_owned_closure_zid_t : z_closure_zid_call, \ - z_owned_closure_owned_sample_t : z_closure_owned_sample_call \ + z_owned_closure_owned_sample_t : z_closure_owned_sample_call, \ + z_owned_closure_owned_query_t : z_closure_owned_query_call \ ) (&x, __VA_ARGS__) /** @@ -141,12 +149,18 @@ z_owned_closure_sample_t : z_closure_sample_move, \ z_owned_closure_owned_sample_t : z_closure_owned_sample_move, \ z_owned_closure_query_t : z_closure_query_move, \ + z_owned_closure_owned_query_t : z_closure_owned_query_move, \ z_owned_closure_reply_t : z_closure_reply_move, \ z_owned_closure_hello_t : z_closure_hello_move, \ z_owned_closure_zid_t : z_closure_zid_move, \ z_owned_sample_t : z_sample_move, \ + z_owned_query_t : z_query_move, \ z_owned_sample_ring_channel_t : z_sample_ring_channel_move, \ - z_owned_sample_fifo_channel_t : z_sample_fifo_channel_move \ + z_owned_sample_fifo_channel_t : z_sample_fifo_channel_move, \ + z_owned_query_ring_channel_t : z_query_ring_channel_move, \ + z_owned_query_fifo_channel_t : z_query_ring_channel_move, \ + z_owned_reply_ring_channel_t : z_reply_ring_channel_move, \ + z_owned_reply_fifo_channel_t : z_reply_ring_channel_move \ )(&x) /** @@ -191,6 +205,7 @@ z_owned_closure_sample_t * : z_closure_sample_null, \ z_owned_closure_owned_sample_t * : z_closure_owned_sample_null, \ z_owned_closure_query_t * : z_closure_query_null, \ + z_owned_closure_owned_query_t * : z_closure_owned_query_null, \ z_owned_closure_reply_t * : z_closure_reply_null, \ z_owned_closure_hello_t * : z_closure_hello_null, \ z_owned_closure_zid_t * : z_closure_zid_null, \ @@ -275,6 +290,10 @@ template<> inline void z_drop(z_owned_closure_hello_t* v) { z_closure_hello_drop template<> inline void z_drop(z_owned_closure_zid_t* v) { z_closure_zid_drop(v); } template<> inline void z_drop(z_owned_sample_ring_channel_t* v) { z_owned_sample_ring_channel_drop(v); } template<> inline void z_drop(z_owned_sample_fifo_channel_t* v) { z_owned_sample_fifo_channel_drop(v); } +template<> inline void z_drop(z_owned_query_ring_channel_t* v) { z_owned_query_ring_channel_drop(v); } +template<> inline void z_drop(z_owned_query_fifo_channel_t* v) { z_owned_query_fifo_channel_drop(v); } +template<> inline void z_drop(z_owned_reply_ring_channel_t* v) { z_owned_reply_ring_channel_drop(v); } +template<> inline void z_drop(z_owned_reply_fifo_channel_t* v) { z_owned_reply_fifo_channel_drop(v); } inline void z_null(z_owned_session_t& v) { v = z_session_null(); } inline void z_null(z_owned_publisher_t& v) { v = z_publisher_null(); } @@ -289,6 +308,7 @@ inline void z_null(z_owned_str_t& v) { v = z_str_null(); } inline void z_null(z_owned_closure_sample_t& v) { v = z_closure_sample_null(); } inline void z_null(z_owned_clusure_owned_sample_t& v) { v = z_closure_owned_sample_null(); } inline void z_null(z_owned_closure_query_t& v) { v = z_closure_query_null(); } +inline void z_null(z_owned_clusure_owned_query_t& v) { v = z_closure_owned_query_null(); } inline void z_null(z_owned_closure_reply_t& v) { v = z_closure_reply_null(); } inline void z_null(z_owned_closure_hello_t& v) { v = z_closure_hello_null(); } inline void z_null(z_owned_closure_zid_t& v) { v = z_closure_zid_null(); } @@ -313,8 +333,10 @@ inline void z_call(const z_owned_closure_owned_sample_t &closure, const z_sample { z_closure_owned_sample_call(&closure, sample); } inline void z_call(const z_owned_closure_query_t &closure, const z_query_t *query) { z_closure_query_call(&closure, query); } -inline void z_call(const z_owned_closure_reply_t &closure, z_owned_reply_t *sample) - { z_closure_reply_call(&closure, sample); } +inline void z_call(const z_owned_closure_owned_query_t &closure, const z_query_t *query) + { z_closure_owned_query_call(&closure, query); } +inline void z_call(const z_owned_closure_reply_t &closure, z_owned_reply_t *reply) + { z_closure_reply_call(&closure, reply); } inline void z_call(const z_owned_closure_hello_t &closure, z_owned_hello_t *hello) { z_closure_hello_call(&closure, hello); } inline void z_call(const z_owned_closure_zid_t &closure, const z_id_t *zid) diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index 62aa8940f..b9ebafdcd 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -615,6 +615,30 @@ z_owned_closure_owned_sample_t z_closure_owned_sample(_z_owned_sample_handler_t */ z_owned_closure_query_t z_closure_query(_z_queryable_handler_t call, _z_dropper_handler_t drop, void *context); +/** + * Return a new query closure. + * It consists on a structure that contains all the elements for stateful, memory-leak-free callbacks. + * + * Like all ``z_owned_X_t``, an instance will be destroyed by any function which takes a mutable pointer to said + * instance, as this implies the instance's inners were moved. To make this fact more obvious when reading your code, + * consider using ``z_move(val)`` instead of ``&val`` as the argument. After a ``z_move``, ``val`` will still exist, but + * will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your ``val`` + * is valid. + * + * To check if ``val`` is still valid, you may use ``z_closure_owned_query_check(&val)`` or ``z_check(val)`` if your + * compiler supports ``_Generic``, which will return ``true`` if ``val`` is valid, or ``false`` otherwise. + * + * Parameters: + * call: the typical callback function. ``context`` will be passed as its last argument. + * drop: allows the callback's state to be freed. ``context`` will be passed as its last argument. + * context: a pointer to an arbitrary state. + * + * Returns: + * Returns a new query closure. + */ +z_owned_closure_owned_query_t z_closure_owned_query(_z_owned_query_handler_t call, _z_dropper_handler_t drop, + void *context); + /** * Return a new reply closure. * It consists on a structure that contains all the elements for stateful, memory-leak-free callbacks. @@ -717,6 +741,7 @@ _OWNED_FUNCTIONS(z_hello_t, z_owned_hello_t, hello) _OWNED_FUNCTIONS(z_reply_t, z_owned_reply_t, reply) _OWNED_FUNCTIONS(z_str_array_t, z_owned_str_array_t, str_array) _OWNED_FUNCTIONS(z_sample_t, z_owned_sample_t, sample) +_OWNED_FUNCTIONS(z_query_t, z_owned_query_t, query) #define _OWNED_FUNCTIONS_CLOSURE(ownedtype, name) \ _Bool z_##name##_check(const ownedtype *val); \ @@ -725,8 +750,9 @@ _OWNED_FUNCTIONS(z_sample_t, z_owned_sample_t, sample) ownedtype z_##name##_null(void); _OWNED_FUNCTIONS_CLOSURE(z_owned_closure_sample_t, closure_sample) -_OWNED_FUNCTIONS_CLOSURE(z_owned_closure_query_t, closure_query) _OWNED_FUNCTIONS_CLOSURE(z_owned_closure_owned_sample_t, closure_owned_sample) +_OWNED_FUNCTIONS_CLOSURE(z_owned_closure_query_t, closure_query) +_OWNED_FUNCTIONS_CLOSURE(z_owned_closure_owned_query_t, closure_owned_query) _OWNED_FUNCTIONS_CLOSURE(z_owned_closure_reply_t, closure_reply) _OWNED_FUNCTIONS_CLOSURE(z_owned_closure_hello_t, closure_hello) _OWNED_FUNCTIONS_CLOSURE(z_owned_closure_zid_t, closure_zid) diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index bf9bd7db2..d0d72f7f6 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -535,6 +535,27 @@ typedef struct { void z_closure_query_call(const z_owned_closure_query_t *closure, const z_query_t *query); +typedef void (*_z_owned_query_handler_t)(z_owned_query_t *query, void *arg); + +/** + * Represents the owned query closure. + * + * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks. + * + * Members: + * _z_owned_query_handler_t call: `void *call(const struct z_owned_query_t*, const void *context)` is the callback + * function. + * _z_dropper_handler_t drop: `void *drop(void*)` allows the callback's state to be freed. void *context: a + * pointer to an arbitrary state. + */ +typedef struct { + void *context; + _z_owned_query_handler_t call; + _z_dropper_handler_t drop; +} z_owned_closure_owned_query_t; + +void z_closure_owned_query_call(const z_owned_closure_owned_query_t *closure, z_owned_query_t *query); + typedef void (*z_owned_reply_handler_t)(z_owned_reply_t *reply, void *arg); /** @@ -543,8 +564,8 @@ typedef void (*z_owned_reply_handler_t)(z_owned_reply_t *reply, void *arg); * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks. * * Members: - * z_owned_reply_handler_t call: `void (*z_owned_reply_handler_t)(z_owned_reply_t reply, void *arg)` is the callback - * function. + * z_owned_reply_handler_t call: `void (*z_owned_reply_handler_t)(const z_owned_reply_t *reply, void *arg)` is the + * callback function. * _z_dropper_handler_t drop: `void *drop(void*)` allows the callback's state to be freed. * void *context: a pointer to an arbitrary state. */ diff --git a/include/zenoh-pico/collections/element.h b/include/zenoh-pico/collections/element.h index dc582016c..5f1bfd7b0 100644 --- a/include/zenoh-pico/collections/element.h +++ b/include/zenoh-pico/collections/element.h @@ -25,7 +25,7 @@ typedef size_t (*z_element_size_f)(void *e); typedef void (*z_element_clear_f)(void *e); typedef void (*z_element_free_f)(void **e); typedef void (*z_element_copy_f)(void *dst, const void *src); -typedef void (*z_element_move_f)(void *dst, const void *src); +typedef void (*z_element_move_f)(void *dst, void *src); typedef void *(*z_element_clone_f)(const void *e); typedef _Bool (*z_element_eq_f)(const void *left, const void *right); diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index 95809b8e1..94f26759b 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -48,6 +48,7 @@ typedef struct { } _z_reply_data_t; void _z_reply_data_clear(_z_reply_data_t *rd); +void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src); _Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy) _Z_LIST_DEFINE(_z_reply_data, _z_reply_data_t) @@ -67,6 +68,7 @@ typedef struct { } _z_reply_t; void _z_reply_clear(_z_reply_t *src); void _z_reply_free(_z_reply_t **hello); +void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src); typedef struct { _z_keyexpr_t _key; diff --git a/src/api/api.c b/src/api/api.c index d327e6e94..6101a94d1 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -328,6 +328,12 @@ void z_closure_query_call(const z_owned_closure_query_t *closure, const z_query_ } } +void z_closure_owned_query_call(const z_owned_closure_owned_query_t *closure, z_owned_query_t *query) { + if (closure->call != NULL) { + (closure->call)(query, closure->context); + } +} + void z_closure_reply_call(const z_owned_closure_reply_t *closure, z_owned_reply_t *reply) { if (closure->call != NULL) { (closure->call)(reply, closure->context); @@ -409,6 +415,14 @@ void z_closure_zid_call(const z_owned_closure_zid_t *closure, const z_id_t *id) } \ } +#define OWNED_FUNCTIONS_PTR_RC(type, ownedtype, name) \ + _Bool z_##name##_check(const ownedtype *val) { return val->_rc.in != NULL; } \ + type z_##name##_loan(const ownedtype *val) { return (type){._val = (ownedtype){._rc.in = val->_rc.in}}; } \ + ownedtype z_##name##_null(void) { return (ownedtype){._rc.in = NULL}; } \ + ownedtype *z_##name##_move(ownedtype *val) { return val; } \ + ownedtype z_##name##_clone(ownedtype *val) { return (ownedtype){._rc = _z_##name##_rc_clone(&val->_rc)}; } \ + void z_##name##_drop(ownedtype *val) { _z_##name##_rc_drop(&val->_rc); } + static inline void _z_owner_noop_copy(void *dst, const void *src) { (void)(dst); (void)(src); @@ -469,6 +483,11 @@ z_owned_closure_query_t z_closure_query(_z_queryable_handler_t call, _z_dropper_ return (z_owned_closure_query_t){.call = call, .drop = drop, .context = context}; } +z_owned_closure_owned_query_t z_closure_owned_query(_z_owned_query_handler_t call, _z_dropper_handler_t drop, + void *context) { + return (z_owned_closure_owned_query_t){.call = call, .drop = drop, .context = context}; +} + z_owned_closure_reply_t z_closure_reply(z_owned_reply_handler_t call, _z_dropper_handler_t drop, void *context) { return (z_owned_closure_reply_t){.call = call, .drop = drop, .context = context}; } @@ -484,6 +503,7 @@ z_owned_closure_zid_t z_closure_zid(z_id_handler_t call, _z_dropper_handler_t dr OWNED_FUNCTIONS_CLOSURE(z_owned_closure_sample_t, closure_sample) OWNED_FUNCTIONS_CLOSURE(z_owned_closure_owned_sample_t, closure_owned_sample) OWNED_FUNCTIONS_CLOSURE(z_owned_closure_query_t, closure_query) +OWNED_FUNCTIONS_CLOSURE(z_owned_closure_owned_query_t, closure_owned_query) OWNED_FUNCTIONS_CLOSURE(z_owned_closure_reply_t, closure_reply) OWNED_FUNCTIONS_CLOSURE(z_owned_closure_hello_t, closure_hello) OWNED_FUNCTIONS_CLOSURE(z_owned_closure_zid_t, closure_zid) @@ -891,6 +911,7 @@ z_value_t z_reply_err(const z_owned_reply_t *reply) { #endif #if Z_FEATURE_QUERYABLE == 1 +OWNED_FUNCTIONS_PTR_RC(z_query_t, z_owned_query_t, query) OWNED_FUNCTIONS_PTR_COMMON(z_queryable_t, z_owned_queryable_t, queryable) OWNED_FUNCTIONS_PTR_CLONE(z_queryable_t, z_owned_queryable_t, queryable, _z_owner_noop_copy) void z_queryable_drop(z_owned_queryable_t *val) { z_undeclare_queryable(val); } diff --git a/src/api/handlers.c b/src/api/handlers.c index e1f0594b9..0e9bea4bb 100644 --- a/src/api/handlers.c +++ b/src/api/handlers.c @@ -18,8 +18,9 @@ #include "zenoh-pico/system/platform.h" // -- Sample -void _z_owned_sample_move(z_owned_sample_t *dst, const z_owned_sample_t *src) { +void _z_owned_sample_move(z_owned_sample_t *dst, z_owned_sample_t *src) { memcpy(dst, src, sizeof(z_owned_sample_t)); + zp_free(src); } z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src) { @@ -35,3 +36,39 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src) { } return dst; } + +#if Z_FEATURE_QUERYABLE == 1 +// -- Query +void _z_owned_query_move(z_owned_query_t *dst, z_owned_query_t *src) { + memcpy(dst, src, sizeof(z_owned_query_t)); + zp_free(src); +} + +z_owned_query_t *_z_query_to_owned_ptr(const z_query_t *src) { + z_owned_query_t *dst = (z_owned_query_t *)zp_malloc(sizeof(z_owned_query_t)); + _z_query_rc_copy(&dst->_rc, &src->_val._rc); + return dst; +} +#endif // Z_FEATURE_QUERYABLE + +#if Z_FEATURE_QUERY == 1 +// -- Reply +void _z_owned_reply_move(z_owned_reply_t *dst, z_owned_reply_t *src) { + memcpy(dst, src, sizeof(z_owned_reply_t)); + zp_free(src); +} + +z_owned_reply_t *_z_reply_clone(const z_owned_reply_t *src) { + z_owned_reply_t *dst = (z_owned_reply_t *)zp_malloc(sizeof(z_owned_reply_t)); + if (dst == NULL) { + return NULL; + } + if (src != NULL && src->_value) { + dst->_value = (_z_reply_t *)zp_malloc(sizeof(_z_reply_t)); + _z_reply_copy(dst->_value, src->_value); + } else { + dst->_value = NULL; + } + return dst; +} +#endif // Z_FEATURE_QUERY diff --git a/src/net/memory.c b/src/net/memory.c index 44d5eb61a..6db3ef506 100644 --- a/src/net/memory.c +++ b/src/net/memory.c @@ -103,6 +103,11 @@ void _z_reply_data_free(_z_reply_data_t **reply_data) { } } +void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src) { + _z_sample_copy(&dst->sample, &src->sample); + dst->replier_id = src->replier_id; +} + void _z_value_clear(_z_value_t *value) { _z_bytes_clear(&value->encoding.schema); _z_bytes_clear(&value->payload); diff --git a/src/session/query.c b/src/session/query.c index 16ab2b487..d4cef44b4 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -46,6 +46,11 @@ void _z_reply_free(_z_reply_t **reply) { } } +void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src) { + _z_reply_data_copy(&dst->data, &src->data); + dst->_tag = src->_tag; +} + _Bool _z_pending_reply_eq(const _z_pending_reply_t *one, const _z_pending_reply_t *two) { return one->_tstamp.time == two->_tstamp.time; } diff --git a/tests/z_channels_test.c b/tests/z_channels_test.c index 900f6087d..e8307d7b7 100644 --- a/tests/z_channels_test.c +++ b/tests/z_channels_test.c @@ -27,6 +27,8 @@ sample.payload.start = (const uint8_t *)v; \ sample.payload.len = strlen(v); \ sample.keyexpr = _z_rname("key"); \ + sample.timestamp = _z_timestamp_null(); \ + sample.encoding = z_encoding_default(); \ z_call(channel.send, &sample); \ } while (0);