diff --git a/dockerfiles/Dockerfile b/dockerfiles/Dockerfile index 7cdcfe6bc88..8e95e63d8cc 100644 --- a/dockerfiles/Dockerfile +++ b/dockerfiles/Dockerfile @@ -75,6 +75,7 @@ RUN cmake -DFLB_RELEASE=On \ -DFLB_IN_SYSTEMD=On \ -DFLB_OUT_KAFKA=On \ -DFLB_OUT_PGSQL=On \ + -DFLB_OUT_PARSEABLE=On \ -DFLB_NIGHTLY_BUILD="$FLB_NIGHTLY_BUILD" \ -DFLB_LOG_NO_CONTROL_CHARS=On \ -DFLB_CHUNK_TRACE="$FLB_CHUNK_TRACE" \ diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index deb24958038..0bfe78d524f 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -316,6 +316,7 @@ REGISTER_OUT_PLUGIN("out_nrlogs") REGISTER_OUT_PLUGIN("out_null") REGISTER_OUT_PLUGIN("out_opensearch") REGISTER_OUT_PLUGIN("out_oracle_log_analytics") +REGISTER_OUT_PLUGIN("out_parseable") if (NOT CMAKE_SYSTEM_NAME MATCHES "Windows") REGISTER_OUT_PLUGIN("out_plot") diff --git a/plugins/out_parseable/CMakeLists.txt b/plugins/out_parseable/CMakeLists.txt new file mode 100644 index 00000000000..fa191223a2e --- /dev/null +++ b/plugins/out_parseable/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + parseable.c) + +FLB_PLUGIN(out_parseable "${src}" "") diff --git a/plugins/out_parseable/parseable.c b/plugins/out_parseable/parseable.c new file mode 100644 index 00000000000..16bd56f6bfb --- /dev/null +++ b/plugins/out_parseable/parseable.c @@ -0,0 +1,350 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#include "parseable.h" + +static int cb_parseable_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ + int ret; + struct flb_out_parseable *ctx = NULL; + (void) ins; + (void) config; + (void) data; + + ctx = flb_calloc(1, sizeof(struct flb_out_parseable)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + + /* Read in config values */ + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + flb_plg_info(ctx->ins, "Configured port: %d", ctx->server_port); + + ctx->upstream = flb_upstream_create(config, + ctx->server_host, + ctx->server_port, + FLB_IO_TCP, + NULL); + + if (!ctx->upstream) { + flb_free(ctx); + return -1; + } + + /* Export context */ + flb_output_set_context(ins, ctx); + + return 0; +} + +/* Main flush callback */ +static void cb_parseable_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + struct flb_out_parseable *ctx = out_context; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + struct flb_record_accessor *ra = NULL; + struct flb_record_accessor *ns_ra = NULL; // For checking namespace + (void) config; + struct flb_http_client *client; + struct flb_connection *u_conn; + flb_sds_t body; + flb_sds_t x_p_stream_value = NULL; + int ret; + int i; + size_t b_sent; + msgpack_sbuffer sbuf; + msgpack_packer pk; + + /* Initialize event decoder */ + flb_plg_info(ctx->ins, "Initializing event decoder..."); + ret = flb_log_event_decoder_init(&log_decoder, (char *) event_chunk->data, event_chunk->size); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, "Failed to initialize event decoder"); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* Create record accessor if stream is set to $NAMESPACE */ + if (ctx->stream && strcmp(ctx->stream, "$NAMESPACE") == 0) { + ra = flb_ra_create("$kubernetes['namespace_name']", FLB_TRUE); + if (!ra) { + flb_plg_error(ctx->ins, "Failed to create record accessor"); + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + } + + /* Create record accessor for namespace exclusion check */ + if (ctx->exclude_namespaces) { + ns_ra = flb_ra_create("$kubernetes['namespace_name']", FLB_TRUE); + if (!ns_ra) { + flb_plg_error(ctx->ins, "Failed to create namespace record accessor"); + if (ra) { + flb_ra_destroy(ra); + } + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + } + + /* Process each event */ + flb_plg_info(ctx->ins, "Processing events..."); + while (flb_log_event_decoder_next(&log_decoder, &log_event) == FLB_EVENT_DECODER_SUCCESS) { + /* Check if namespace is in exclusion list */ + if (ns_ra && ctx->exclude_namespaces) { + flb_sds_t current_ns = flb_ra_translate(ns_ra, NULL, -1, *log_event.body, NULL); + if (current_ns) { + struct cfl_list *head; + struct flb_slist_entry *entry; + int skip = 0; + + cfl_list_foreach(head, ctx->exclude_namespaces) { + entry = cfl_list_entry(head, struct flb_slist_entry, _head); + if (strcmp(current_ns, entry->str) == 0) { + flb_plg_debug(ctx->ins, "Skipping excluded namespace: %s", current_ns); + skip = 1; + break; + } + } + + flb_sds_destroy(current_ns); + if (skip) { + continue; + } + } + } + + /* Initialize the packer and buffer */ + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write); + + /* Pack the map with one additional field */ + msgpack_pack_map(&pk, log_event.body->via.map.size + 1); + + /* Pack original map content */ + for (i = 0; i < log_event.body->via.map.size; i++) { + msgpack_pack_object(&pk, log_event.body->via.map.ptr[i].key); + msgpack_pack_object(&pk, log_event.body->via.map.ptr[i].val); + } + + /* Add source field */ + msgpack_pack_str_with_body(&pk, "source", 6); + msgpack_pack_str_with_body(&pk, "fluent bit parseable plugin", 25); + + /* Convert to JSON */ + body = flb_msgpack_raw_to_json_sds(sbuf.data, sbuf.size); + if (!body) { + flb_plg_error(ctx->ins, "Failed to convert msgpack to JSON"); + msgpack_sbuffer_destroy(&sbuf); + if (ra) { + flb_ra_destroy(ra); + } + if (ns_ra) { + flb_ra_destroy(ns_ra); + } + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* Determine X-P-Stream value */ + if (ra) { + /* Use record accessor to get namespace_name */ + flb_sds_t ns = flb_ra_translate(ra, NULL, -1, *log_event.body, NULL); + if (!ns) { + flb_plg_error(ctx->ins, "Failed to extract namespace_name using record accessor"); + flb_sds_destroy(body); + msgpack_sbuffer_destroy(&sbuf); + flb_ra_destroy(ra); + if (ns_ra) { + flb_ra_destroy(ns_ra); + } + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + x_p_stream_value = ns; + } + else if (ctx->stream) { + x_p_stream_value = flb_sds_create(ctx->stream); + if (!x_p_stream_value) { + flb_plg_error(ctx->ins, "Failed to set X-P-Stream header"); + flb_sds_destroy(body); + msgpack_sbuffer_destroy(&sbuf); + if (ra) { + flb_ra_destroy(ra); + } + if (ns_ra) { + flb_ra_destroy(ns_ra); + } + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + } + else { + flb_plg_error(ctx->ins, "Stream is not set"); + flb_sds_destroy(body); + msgpack_sbuffer_destroy(&sbuf); + if (ra) { + flb_ra_destroy(ra); + } + if (ns_ra) { + flb_ra_destroy(ns_ra); + } + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->upstream); + if (!u_conn) { + flb_plg_error(ctx->ins, "Connection initialization error"); + flb_sds_destroy(body); + flb_sds_destroy(x_p_stream_value); + msgpack_sbuffer_destroy(&sbuf); + if (ra) { + flb_ra_destroy(ra); + } + if (ns_ra) { + flb_ra_destroy(ns_ra); + } + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* Create HTTP client */ + client = flb_http_client(u_conn, + FLB_HTTP_POST, "/api/v1/ingest", + body, flb_sds_len(body), + ctx->server_host, ctx->server_port, + NULL, 0); + if (!client) { + flb_plg_error(ctx->ins, "Could not create HTTP client"); + flb_sds_destroy(body); + flb_sds_destroy(x_p_stream_value); + msgpack_sbuffer_destroy(&sbuf); + flb_upstream_conn_release(u_conn); + if (ra) { + flb_ra_destroy(ra); + } + if (ns_ra) { + flb_ra_destroy(ns_ra); + } + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* Set headers */ + flb_http_add_header(client, "Content-Type", 12, "application/json", 16); + flb_http_add_header(client, "X-P-Stream", 10, x_p_stream_value, flb_sds_len(x_p_stream_value)); + flb_http_basic_auth(client, ctx->username, ctx->password); + + /* Perform request */ + ret = flb_http_do(client, &b_sent); + flb_plg_info(ctx->ins, "HTTP request sent. Status=%i", client->resp.status); + + /* Clean up resources for this iteration */ + flb_sds_destroy(body); + flb_sds_destroy(x_p_stream_value); + flb_http_client_destroy(client); + flb_upstream_conn_release(u_conn); + msgpack_sbuffer_destroy(&sbuf); + } + + /* Final cleanup */ + if (ra) { + flb_ra_destroy(ra); + } + if (ns_ra) { + flb_ra_destroy(ns_ra); + } + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(FLB_OK); +} + +static int cb_parseable_exit(void *data, struct flb_config *config) +{ + struct flb_out_parseable *ctx = data; + + if (!ctx) { + return 0; + } + + if (ctx->exclude_namespaces) { + flb_slist_destroy((struct mk_list *)ctx->exclude_namespaces); + } + + /* Free up resources */ + if (ctx->upstream) { + flb_upstream_destroy(ctx->upstream); + } + flb_free(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "server_host", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_parseable, server_host), + "The host of the server to send logs to." + }, + { + FLB_CONFIG_MAP_STR, "username", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_parseable, username), + "The parseable server username." + }, + { + FLB_CONFIG_MAP_STR, "password", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_parseable, password), + "The parseable server password." + }, + { + FLB_CONFIG_MAP_STR, "stream", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_parseable, stream), + "The stream name to send logs to. Using $NAMESPACE will dynamically create a namespace." + }, + { + FLB_CONFIG_MAP_INT, "server_port", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_parseable, server_port), + "The port on the host to send logs to." + }, + { + FLB_CONFIG_MAP_CLIST, "Exclude_Namespaces", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_parseable, exclude_namespaces), + "A space-separated list of Kubernetes namespaces to exclude from log forwarding." + }, + /* EOF */ + {0} +}; + +/* Plugin registration */ +struct flb_output_plugin out_parseable_plugin = { + .name = "parseable", + .description = "Sends events to a HTTP server", + .cb_init = cb_parseable_init, + .cb_flush = cb_parseable_flush, + .cb_exit = cb_parseable_exit, + .flags = 0, + .event_type = FLB_OUTPUT_LOGS, + .config_map = config_map +}; diff --git a/plugins/out_parseable/parseable.h b/plugins/out_parseable/parseable.h new file mode 100644 index 00000000000..6b440d7553c --- /dev/null +++ b/plugins/out_parseable/parseable.h @@ -0,0 +1,18 @@ +#ifndef FLB_OUT_PARSEABLE_H +#define FLB_OUT_PARSEABLE_H + +#include +#include + +struct flb_out_parseable { + flb_sds_t server_host; + int server_port; + flb_sds_t username; + flb_sds_t password; + flb_sds_t stream; + struct cfl_list *exclude_namespaces; // Use cfl_list for namespace exclusion + struct flb_upstream *upstream; + struct flb_output_instance *ins; +}; + +#endif diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 49ed0e0cf5d..9bbbc753569 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -203,6 +203,7 @@ if(FLB_IN_LIB) FLB_RT_TEST(FLB_OUT_SKYWALKING "out_skywalking.c") FLB_RT_TEST(FLB_OUT_ES "out_elasticsearch.c") FLB_RT_TEST(FLB_OUT_OPENSEARCH "out_opensearch.c") + FLB_RT_TEST(FLB_OUT_PARSEABLE "out_parseable.c") FLB_RT_TEST(FLB_OUT_EXIT "out_exit.c") FLB_RT_TEST(FLB_OUT_FLOWCOUNTER "out_flowcounter.c") FLB_RT_TEST(FLB_OUT_FORWARD "out_forward.c") diff --git a/tests/runtime/out_parseable.c b/tests/runtime/out_parseable.c new file mode 100644 index 00000000000..ac47b1027a3 --- /dev/null +++ b/tests/runtime/out_parseable.c @@ -0,0 +1,192 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include "flb_tests_runtime.h" + +/* Test data */ +#define JSON_BASIC "[1448403340, {\"message\":\"test message\", \"kubernetes\":{\"namespace_name\":\"test-ns\"}}]" +#define JSON_EXCLUDE "[1448403340, {\"message\":\"excluded message\", \"kubernetes\":{\"namespace_name\":\"kube-system\"}}]" + +/* Test callbacks */ +static void cb_check_basic_config(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + char *out_js = res_data; + + /* Check that source field was added */ + p = strstr(out_js, "\"source\":\"fluent bit parseable plugin\""); + TEST_CHECK(p != NULL); + + /* Check X-P-Stream header value */ + p = strstr(out_js, "\"X-P-Stream\":\"test-stream\""); + TEST_CHECK(p != NULL); + + flb_sds_destroy(res_data); +} + +static void cb_check_namespace_stream(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + char *out_js = res_data; + + /* Check that namespace from kubernetes metadata is used as stream */ + p = strstr(out_js, "\"X-P-Stream\":\"test-ns\""); + TEST_CHECK(p != NULL); + + flb_sds_destroy(res_data); +} + +static void cb_check_exclude_namespace(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + /* This callback should not be called if namespace exclusion works */ + TEST_CHECK(false); + flb_sds_destroy(res_data); +} + +/* Test functions */ +void flb_test_basic_config() +{ + int ret; + int size = sizeof(JSON_BASIC) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Parseable output */ + out_ffd = flb_output(ctx, (char *) "parseable", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "server_host", "localhost", + "server_port", "8000", + "username", "test-user", + "password", "test-pass", + "stream", "test-stream", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_basic_config, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_BASIC, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_namespace_stream() +{ + int ret; + int size = sizeof(JSON_BASIC) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Parseable output with $NAMESPACE stream */ + out_ffd = flb_output(ctx, (char *) "parseable", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "server_host", "localhost", + "server_port", "8000", + "username", "test-user", + "password", "test-pass", + "stream", "$NAMESPACE", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_namespace_stream, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_BASIC, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_exclude_namespace() +{ + int ret; + int size = sizeof(JSON_EXCLUDE) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Parseable output with excluded namespace */ + out_ffd = flb_output(ctx, (char *) "parseable", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "server_host", "localhost", + "server_port", "8000", + "username", "test-user", + "password", "test-pass", + "stream", "test-stream", + "exclude_namespaces", "kube-system", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_exclude_namespace, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_EXCLUDE, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +/* Test list */ +TEST_LIST = { + {"basic_config", flb_test_basic_config}, + {"namespace_stream", flb_test_namespace_stream}, + {"exclude_namespace", flb_test_exclude_namespace}, + {NULL, NULL} +};