Skip to content

Commit

Permalink
Add query and reply channels
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Apr 11, 2024
1 parent aa883f4 commit a6e1278
Show file tree
Hide file tree
Showing 15 changed files with 454 additions and 23 deletions.
2 changes: 2 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ if(UNIX)
add_example(z_sub_st unix/c11/z_sub_st.c)
add_example(z_pull unix/c11/z_pull.c)
add_example(z_get unix/c11/z_get.c)
add_example(z_get_channel unix/c11/z_get_channel.c)
add_example(z_queryable unix/c11/z_queryable.c)
add_example(z_queryable_channel unix/c11/z_queryable_channel.c)
add_example(z_info unix/c11/z_info.c)
add_example(z_scout unix/c11/z_scout.c)
add_example(z_ping unix/c11/z_ping.c)
Expand Down
129 changes: 129 additions & 0 deletions examples/unix/c11/z_get_channel.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <ctype.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <zenoh-pico.h>

#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1

int main(int argc, char **argv) {
const char *keyexpr = "demo/example/**";
const char *mode = "client";
const char *clocator = NULL;
const char *llocator = NULL;
const char *value = NULL;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:v:l:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
break;
case 'e':
clocator = optarg;
break;
case 'm':
mode = optarg;
break;
case 'l':
llocator = optarg;
break;
case 'v':
value = optarg;
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config = z_config_default();
zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode));
if (clocator != NULL) {
zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator));
}
if (llocator != NULL) {
zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator));
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_close(z_session_move(&s));
return -1;
}

z_keyexpr_t ke = z_keyexpr(keyexpr);
if (!z_check(ke)) {
printf("%s is not a valid key expression", keyexpr);
return -1;
}

printf("Sending Query '%s'...\n", keyexpr);
z_get_options_t opts = z_get_options_default();
if (value != NULL) {
opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value));
}
z_owned_reply_ring_channel_t channel = z_reply_ring_channel_new(1);
if (z_get(z_loan(s), ke, "", z_move(channel.send), &opts) < 0) {
printf("Unable to send query.\n");
return -1;
}

z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
} else {
printf(">> Received an error\n");
}
}

z_drop(z_move(channel));

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_loan(s));
zp_stop_lease_task(z_loan(s));

z_close(z_move(s));

return 0;
}
#else
int main(void) {
printf(
"ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires "
"them.\n");
return -2;
}
#endif
2 changes: 1 addition & 1 deletion examples/unix/c11/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ int main(int argc, char **argv) {
printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size);
z_owned_sample_t sample = z_sample_null();
while (true) {
for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) {
for (z_call(channel.try_recv, &sample); z_check(sample); z_call(channel.try_recv, &sample)) {
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample).keyexpr);
printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)z_loan(sample).payload.len,
z_loan(sample).payload.start);
Expand Down
130 changes: 130 additions & 0 deletions examples/unix/c11/z_queryable_channel.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <ctype.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <zenoh-pico.h>

#if Z_FEATURE_QUERYABLE == 1
const char *keyexpr = "demo/example/zenoh-pico-queryable";
const char *value = "Queryable from Pico!";

int main(int argc, char **argv) {
const char *mode = "client";
char *clocator = NULL;
char *llocator = NULL;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:v:l:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
break;
case 'e':
clocator = optarg;
break;
case 'm':
mode = optarg;
break;
case 'l':
llocator = optarg;
break;
case 'v':
value = optarg;
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config = z_config_default();
zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode));
if (clocator != NULL) {
zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator));
}
if (llocator != NULL) {
zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator));
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_close(z_session_move(&s));
return -1;
}

z_keyexpr_t ke = z_keyexpr(keyexpr);
if (!z_check(ke)) {
printf("%s is not a valid key expression", keyexpr);
return -1;
}

printf("Creating Queryable on '%s'...\n", keyexpr);
z_owned_query_ring_channel_t channel = z_query_ring_channel_new(10);
z_owned_queryable_t qable = z_declare_queryable(z_loan(s), ke, z_move(channel.send), NULL);
if (!z_check(qable)) {
printf("Unable to create queryable.\n");
return -1;
}

z_owned_query_t query = z_query_null();
for (z_call(channel.recv, &query); z_check(query); z_call(channel.recv, &query)) {
z_query_t q = z_loan(query);
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(&q));
z_bytes_t pred = z_query_parameters(&q);
z_value_t payload_value = z_query_value(&q);
printf(" >> [Queryable handler] Received Query '%s?%.*s'\n", z_loan(keystr), (int)pred.len, pred.start);
if (payload_value.payload.len > 0) {
printf(" with value '%.*s'\n", (int)payload_value.payload.len, payload_value.payload.start);
}
z_query_reply_options_t options = z_query_reply_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);
z_query_reply(&q, z_keyexpr(keyexpr), (const unsigned char *)value, strlen(value), &options);
z_drop(z_move(keystr));
z_drop(z_move(query));
}

z_drop(z_move(channel));
z_undeclare_queryable(z_move(qable));

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_loan(s));
zp_stop_lease_task(z_loan(s));

z_close(z_move(s));

return 0;
}
#else
int main(void) {
printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERYABLE but this example requires it.\n");
return -2;
}
#endif
49 changes: 39 additions & 10 deletions include/zenoh-pico/api/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,21 @@
#include "zenoh-pico/utils/logging.h"

// -- Samples handler
void _z_owned_sample_move(z_owned_sample_t *dst, const z_owned_sample_t *src);
void _z_owned_sample_move(z_owned_sample_t *dst, z_owned_sample_t *src);
z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);

// -- Queries handler
void _z_owned_query_move(z_owned_query_t *dst, z_owned_query_t *src);
z_owned_query_t *_z_query_to_owned_ptr(const z_query_t *src);

// -- Reply handler
void _z_owned_reply_move(z_owned_reply_t *dst, z_owned_reply_t *src);
z_owned_reply_t *_z_reply_clone(const z_owned_reply_t *src);

// -- Channel
#define _Z_CHANNEL_DEFINE(name, send_closure_name, recv_closure_name, send_type, recv_type, collection_type, \
collection_new_f, collection_free_f, collection_push_f, collection_pull_f, \
collection_try_pull_f, elem_move_f, elem_convert_f, elem_free_f) \
collection_try_pull_f, elem_move_f, elem_convert_f, elem_drop_f) \
typedef struct { \
z_owned_##send_closure_name##_t send; \
z_owned_##recv_closure_name##_t recv; \
Expand All @@ -39,13 +47,14 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);
} z_owned_##name##_t; \
\
static inline void _z_##name##_elem_free(void **elem) { \
elem_free_f((recv_type *)*elem); \
elem_drop_f((recv_type *)*elem); \
zp_free(*elem); \
*elem = NULL; \
} \
static inline void _z_##name##_elem_move(void *dst, const void *src) { \
elem_move_f((recv_type *)dst, (const recv_type *)src); \
static inline void _z_##name##_elem_move(void *dst, void *src) { \
elem_move_f((recv_type *)dst, (recv_type *)src); \
} \
static inline void _z_##name##_send(const send_type *elem, void *context) { \
static inline void _z_##name##_send(send_type *elem, void *context) { \
void *internal_elem = elem_convert_f(elem); \
if (internal_elem == NULL) { \
return; \
Expand Down Expand Up @@ -84,13 +93,33 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);
}

// z_owned_sample_ring_channel_t
_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_ring_mt_t,
_z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull,
_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, const z_sample_t, z_owned_sample_t,
_z_ring_mt_t, _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull,
_z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop)

// z_owned_sample_fifo_channel_t
_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_fifo_mt_t,
_z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull,
_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, const z_sample_t, z_owned_sample_t,
_z_fifo_mt_t, _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull,
_z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop)

// z_owned_query_ring_channel_t
_Z_CHANNEL_DEFINE(query_ring_channel, closure_query, closure_owned_query, const z_query_t, z_owned_query_t,
_z_ring_mt_t, _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull,
_z_owned_query_move, _z_query_to_owned_ptr, z_query_drop)

// z_owned_query_fifo_channel_t
_Z_CHANNEL_DEFINE(query_fifo_channel, closure_query, closure_owned_query, const z_query_t, z_owned_query_t,
_z_fifo_mt_t, _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull,
_z_owned_query_move, _z_query_to_owned_ptr, z_query_drop)

// z_owned_reply_ring_channel_t
_Z_CHANNEL_DEFINE(reply_ring_channel, closure_reply, closure_reply, z_owned_reply_t, z_owned_reply_t, _z_ring_mt_t,
_z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull,
_z_owned_reply_move, _z_reply_clone, z_reply_drop)

// z_owned_reply_fifo_channel_t
_Z_CHANNEL_DEFINE(reply_fifo_channel, closure_reply, closure_reply, z_owned_reply_t, z_owned_reply_t, _z_fifo_mt_t,
_z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull,
_z_owned_reply_move, _z_reply_clone, z_reply_drop)

#endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H
Loading

0 comments on commit a6e1278

Please sign in to comment.