Skip to content

Commit

Permalink
out_opentelemetry: add gzip compression for grpc
Browse files Browse the repository at this point in the history
Signed-off-by: Florian Bezannier <[email protected]>
  • Loading branch information
flobz committed Jan 10, 2025
1 parent 14ca011 commit c638a19
Showing 1 changed file with 42 additions and 10 deletions.
52 changes: 42 additions & 10 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
const char *http_uri,
const char *grpc_uri)
{
const char *compression_algorithm;
char *compression_algorithm;
uint32_t wire_message_length;
size_t grpc_body_length;
cfl_sds_t sds_result;
Expand All @@ -237,6 +237,10 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
struct flb_http_request *request;
int out_ret;
int result;
const char *grpc_status;
const char *grpc_message;
size_t final_body_len;
void *final_body;

if (!ctx->enable_http2_flag) {
return opentelemetry_legacy_post(ctx,
Expand All @@ -246,6 +250,9 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
}

compression_algorithm = NULL;
if (ctx->compress_gzip == FLB_TRUE) {
compression_algorithm = "gzip";
}

request = flb_http_client_request_builder(
&ctx->http_client,
Expand All @@ -264,16 +271,28 @@ int opentelemetry_post(struct opentelemetry_context *ctx,

if (request->protocol_version == HTTP_PROTOCOL_VERSION_20 &&
ctx->enable_grpc_flag) {
grpc_body = cfl_sds_create_size(body_len + 5);
if (ctx->compress_gzip) {
result = flb_gzip_compress((void *) body, body_len,
&final_body, &final_body_len);
if (result != 0) {
compression_algorithm = NULL;
flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression");
}
}
else {
final_body = (void *) body;
final_body_len = body_len;
}

grpc_body = cfl_sds_create_size(final_body_len + 5);

if (grpc_body == NULL) {
flb_http_client_request_destroy(request, FLB_TRUE);

return FLB_RETRY;
}

wire_message_length = (uint32_t) body_len;

wire_message_length = (uint32_t) final_body_len;
sds_result = cfl_sds_cat(grpc_body, "\x00----", 5);

if (sds_result == NULL) {
Expand All @@ -286,12 +305,15 @@ int opentelemetry_post(struct opentelemetry_context *ctx,

grpc_body = sds_result;

if(compression_algorithm != NULL){
((uint8_t *) grpc_body)[0] = 0x01;
}
((uint8_t *) grpc_body)[1] = (wire_message_length & 0xFF000000) >> 24;
((uint8_t *) grpc_body)[2] = (wire_message_length & 0x00FF0000) >> 16;
((uint8_t *) grpc_body)[3] = (wire_message_length & 0x0000FF00) >> 8;
((uint8_t *) grpc_body)[4] = (wire_message_length & 0x000000FF) >> 0;

sds_result = cfl_sds_cat(grpc_body, body, body_len);
sds_result = cfl_sds_cat(grpc_body, final_body, final_body_len);

if (sds_result == NULL) {
flb_http_client_request_destroy(request, FLB_TRUE);
Expand All @@ -311,7 +333,21 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
"application/grpc"),
FLB_HTTP_CLIENT_ARGUMENT_BODY(grpc_body,
grpc_body_length,
compression_algorithm));
NULL));

if(compression_algorithm != NULL) {
flb_http_request_set_header(request,
"grpc-encoding",
0,
compression_algorithm,
0);
flb_http_request_set_header(request,
"grpc-accept-encoding",
0,
compression_algorithm,
0);
flb_free(final_body);
}

cfl_sds_destroy(grpc_body);

Expand All @@ -322,10 +358,6 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
}
}
else {
if (ctx->compress_gzip == FLB_TRUE) {
compression_algorithm = "gzip";
}

result = flb_http_request_set_parameters(request,
FLB_HTTP_CLIENT_ARGUMENT_URI(http_uri),
FLB_HTTP_CLIENT_ARGUMENT_CONTENT_TYPE(
Expand Down

0 comments on commit c638a19

Please sign in to comment.