From c23b26cc40c954ffe8ebcb012d23281320515b56 Mon Sep 17 00:00:00 2001 From: Dennis Kaarsemaker Date: Thu, 7 Nov 2024 15:55:18 +0100 Subject: [PATCH] in_systemd: allow a parser to be specified as part of the systemd unit Similar to how we use kubernetes annotations to determine a parser, this uses custom fields in systemd units to configure a parser per systemd unit. In the unit file this is configured as: ``` [Service] ... LogExtraFields=FLUENT_BIT_PARSER=logfmt ``` Signed-off-by: Dennis Kaarsemaker --- plugins/in_systemd/systemd.c | 93 ++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/plugins/in_systemd/systemd.c b/plugins/in_systemd/systemd.c index 85f36ee990a..c0ef0b62121 100644 --- a/plugins/in_systemd/systemd.c +++ b/plugins/in_systemd/systemd.c @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include "systemd_config.h" #include "systemd_db.h" @@ -70,6 +72,59 @@ static int tag_compose(const char *tag, const char *unit_name, return 0; } +static int flb_systemd_repack_map(struct flb_log_event_encoder *encoder, + char *data, + size_t data_size) +{ + msgpack_unpacked source_map; + size_t offset; + int result; + size_t index; + msgpack_object value; + msgpack_object key; + + result = FLB_EVENT_ENCODER_SUCCESS; + + if (data_size > 0) { + msgpack_unpacked_init(&source_map); + + offset = 0; + result = msgpack_unpack_next(&source_map, + data, + data_size, + &offset); + + if (result == MSGPACK_UNPACK_SUCCESS) { + result = FLB_EVENT_ENCODER_SUCCESS; + } + else { + result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE; + } + + for (index = 0; + index < source_map.data.via.map.size && + result == FLB_EVENT_ENCODER_SUCCESS; + index++) { + key = source_map.data.via.map.ptr[index].key; + value = source_map.data.via.map.ptr[index].val; + + result = flb_log_event_encoder_append_body_msgpack_object( + encoder, + &key); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_msgpack_object( + encoder, + &value); + } + } + + msgpack_unpacked_destroy(&source_map); + } + + return result; +} + static int in_systemd_collect(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { @@ -84,11 +139,14 @@ static int in_systemd_collect(struct flb_input_instance *ins, long nsec; uint64_t usec; size_t length; + size_t plength; size_t threshold; + char *name; const char *sep; const char *key; const char *val; char *buf = NULL; + void *pbuf = NULL; #ifdef FLB_HAVE_SQLDB char *cursor = NULL; #endif @@ -100,6 +158,7 @@ static int in_systemd_collect(struct flb_input_instance *ins, const void *data; struct flb_systemd_config *ctx = in_context; struct flb_time tm; + struct flb_parser *parser; /* Restricted by mem_buf_limit */ if (flb_input_buf_paused(ins) == FLB_TRUE) { @@ -134,6 +193,8 @@ static int in_systemd_collect(struct flb_input_instance *ins, } while ((ret_j = sd_journal_next(ctx->j)) > 0) { + /* Reset dynamic parser */ + parser = NULL; /* If the tag is composed dynamically, gather the Systemd Unit name */ if (ctx->dynamic_tag) { ret = sd_journal_get_data(ctx->j, "_SYSTEMD_UNIT", &data, &length); @@ -154,6 +215,17 @@ static int in_systemd_collect(struct flb_input_instance *ins, tag_len = ctx->ins->tag_len; } + /* Find the parser, if specified */ + ret = sd_journal_get_data(ctx->j, "FLUENT_BIT_PARSER", &data, &length); + if (ret == 0) { + name = flb_strndup((const char *)(data+18), length-18); + parser = flb_parser_get(name, config); + if (!parser) { + flb_plg_error(ctx->ins, "no such parser: '%s'", name); + } + free(name); + } + if (last_tag_len == 0) { strncpy(last_tag, tag, tag_len); last_tag_len = tag_len; @@ -219,6 +291,27 @@ static int in_systemd_collect(struct flb_input_instance *ins, len = (sep - key); + if (strncmp(key, "FLUENT_BIT_PARSER", len) == 0) { + continue; + } + + /* If this is the message, apply the parser if any is specified */ + if (parser && strncmp(key, "MESSAGE", len) == 0) { + val = sep + 1; + len = length - (sep - key) - 1; + ret = flb_parser_do(parser, val, len, &pbuf, &plength, &tm); + if (ret != -1) { + ret = flb_systemd_repack_map(ctx->log_encoder, pbuf, plength); + flb_free(pbuf); + continue; + } + /* + * If the parser failed, reset the return code + * to append the unparsed message as normal + */ + ret = FLB_EVENT_ENCODER_SUCCESS; + } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_append_body_string_length( ctx->log_encoder, len);