diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index 91a4c6f513a..98a09eadadc 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -228,7 +228,7 @@ int opentelemetry_post(struct opentelemetry_context *ctx, const char *http_uri, const char *grpc_uri) { - const char *compression_algorithm; + char *compression_algorithm; uint32_t wire_message_length; size_t grpc_body_length; cfl_sds_t sds_result; @@ -237,6 +237,10 @@ int opentelemetry_post(struct opentelemetry_context *ctx, struct flb_http_request *request; int out_ret; int result; + const char *grpc_status; + const char *grpc_message; + size_t final_body_len; + void *final_body; if (!ctx->enable_http2_flag) { return opentelemetry_legacy_post(ctx, @@ -246,6 +250,9 @@ int opentelemetry_post(struct opentelemetry_context *ctx, } compression_algorithm = NULL; + if (ctx->compress_gzip == FLB_TRUE) { + compression_algorithm = "gzip"; + } request = flb_http_client_request_builder( &ctx->http_client, @@ -264,7 +271,20 @@ int opentelemetry_post(struct opentelemetry_context *ctx, if (request->protocol_version == HTTP_PROTOCOL_VERSION_20 && ctx->enable_grpc_flag) { - grpc_body = cfl_sds_create_size(body_len + 5); + if (ctx->compress_gzip) { + result = flb_gzip_compress((void *) body, body_len, + &final_body, &final_body_len); + if (result != 0) { + compression_algorithm = NULL; + flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression"); + } + } + else { + final_body = (void *) body; + final_body_len = body_len; + } + + grpc_body = cfl_sds_create_size(final_body_len + 5); if (grpc_body == NULL) { flb_http_client_request_destroy(request, FLB_TRUE); @@ -272,8 +292,7 @@ int opentelemetry_post(struct opentelemetry_context *ctx, return FLB_RETRY; } - wire_message_length = (uint32_t) body_len; - + wire_message_length = (uint32_t) final_body_len; sds_result = cfl_sds_cat(grpc_body, "\x00----", 5); if (sds_result == NULL) { @@ -286,12 +305,15 @@ int opentelemetry_post(struct opentelemetry_context *ctx, grpc_body = sds_result; + if(compression_algorithm != NULL){ + ((uint8_t *) grpc_body)[0] = 0x01; + } ((uint8_t *) grpc_body)[1] = (wire_message_length & 0xFF000000) >> 24; ((uint8_t *) grpc_body)[2] = (wire_message_length & 0x00FF0000) >> 16; ((uint8_t *) grpc_body)[3] = (wire_message_length & 0x0000FF00) >> 8; ((uint8_t *) grpc_body)[4] = (wire_message_length & 0x000000FF) >> 0; - sds_result = cfl_sds_cat(grpc_body, body, body_len); + sds_result = cfl_sds_cat(grpc_body, final_body, final_body_len); if (sds_result == NULL) { flb_http_client_request_destroy(request, FLB_TRUE); @@ -311,7 +333,21 @@ int opentelemetry_post(struct opentelemetry_context *ctx, "application/grpc"), FLB_HTTP_CLIENT_ARGUMENT_BODY(grpc_body, grpc_body_length, - compression_algorithm)); + NULL)); + + if(compression_algorithm != NULL) { + flb_http_request_set_header(request, + "grpc-encoding", + 0, + compression_algorithm, + 0); + flb_http_request_set_header(request, + "grpc-accept-encoding", + 0, + compression_algorithm, + 0); + flb_free(final_body); + } cfl_sds_destroy(grpc_body); @@ -322,10 +358,6 @@ int opentelemetry_post(struct opentelemetry_context *ctx, } } else { - if (ctx->compress_gzip == FLB_TRUE) { - compression_algorithm = "gzip"; - } - result = flb_http_request_set_parameters(request, FLB_HTTP_CLIENT_ARGUMENT_URI(http_uri), FLB_HTTP_CLIENT_ARGUMENT_CONTENT_TYPE(