Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/stefexporter] Add basic STEF exporter implementation #37564

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

tigrannajaryan
Copy link
Member

Description

Added STEF exporter implementation for metrics, sending data over gRPC stream. For now only queuing and retry exporter helpers are used. We will need to decide later if other helpers are needed for this exporter.

Testing

Unit tests that verify connecting, reconnecting, sending, acking of data are included.

Documentation

Added to README.

Future Work

More extensive test coverage is desirable and will likely be added in the future.

We likely want to implement STEF receiver and add STEF as a tested protocol to our testbed.

#### Description

Added STEF exporter implementation for metrics, sending data over gRPC stream.
For now only queuing and retry exporter helpers are used. We will need to
decide later if other helpers are needed for this exporter.

#### Testing

Unit tests that verify connecting, reconnecting, sending, acking of data
are included.

#### Documentation

Added to README.

#### Future Work

More extensive test coverage is desirable and will likely be added
in the future.

We likely want to implement STEF receiver and add STEF as a tested protocol
to our testbed.
Copy link
Member

@bogdandrutu bogdandrutu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has lots of concurrency issues, and I would suggest to simplify the design because otherwise this is impossible to get right.

func newStefExporter(set component.TelemetrySettings, cfg *Config) *stefExporter {
exp := &stefExporter{
set: set,
logger: set.Logger,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you saved the set, no need to save the logger as well.

Comment on lines +102 to +103
cfg: cfg,
maxWritesInProgress: int64(cfg.NumConsumers),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, since you have the config no need to copy individual.

s.stefWriterMutex.Lock()
defer s.stefWriterMutex.Unlock()

converter := stefpdatametrics.OtlpToTEFUnsorted{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in name? TEF -> STEF?

"github.com/splunk/stef/go/grpc/stef_proto"
"github.com/splunk/stef/go/otel/oteltef"
stefpdatametrics "github.com/splunk/stef/go/pdata/metrics"
"github.com/splunk/stef/go/pkg"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not ideal pkg name, please import with an lieas like "stefpkg", hard to read otherwise.

Comment on lines +148 to +153
// Connect to the server.
var err error
s.grpcConn, err = s.cfg.ClientConfig.ToClientConn(ctx, host, s.set)
if err != nil {
return err
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not re-create the connection. gRPC is smart enough to re-connect itself, I am confused and never seen this before, which makes me think this is not ideal.

Comment on lines +260 to +262
// stefWriter is not safe for concurrent writing, protect it.
s.stefWriterMutex.Lock()
defer s.stefWriterMutex.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing synchronous cross cloud regions (since if I understand correctly STEF is for that) is a questionable design in my opinion. Should we at least have multiple "connection" that we use in the same time?

// We achieved max concurrency. No further exportMetrics calls will make
// progress until at least some of the exportMetrics calls return. For
// calls to return they need to receive an Ack. For Ack to be received
// we need to make the data we wrote is actually sent, so we need to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement does not make sense to me...

For Ack to be received we need to make the data we wrote is actually sent

// back and ack ID that numerically matches the last written record number.
expectedAckID := s.stefWriter.RecordCount()

if s.writesInProgress.Load() >= s.maxWritesInProgress {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we send the data instead of blocking until room to send? Also does Flush mean writesInProgress will decrease?

Also, you have a bug here, because you will endup with sending 1 request then flush after every request in case of high load.

Think if you have 100000 of requests that do writesInProgress + 1 this condition will be true after each execution.

Comment on lines +224 to +226
// Wait for all in progress wait in goroutines to get the notification that
// we are disconnecting and exit.
s.inProgressWaits.Wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a go-routine is at line 247 when you hit this point? you will pass it, but in reality that gorutine did not yet entered the ack. So you have to ensure that happens under the lock that sends the request.

https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/37564/files#diff-fb9ac6a5f4f3f6d351d846eda06763a06cc2fe8f7483964b428a4a99e45757b9R247

@tigrannajaryan
Copy link
Member Author

I think this has lots of concurrency issues, and I would suggest to simplify the design because otherwise this is impossible to get right.

@bogdandrutu I agree, I don't like the design myself. If we can find a simpler way I will be happier.

Here are my constraints:

Prefer single gRPC stream

Why? Two reasons:

  1. STEF stream compression ratio increases as more data is sent through it. This is because of dictionary encoding at 2 different levels (STEF itself and ZStd), causing previously seen data to compress better. If we split the data that the exporter receives from the pipeline into multiple STEF streams we will reduce compression ratio.
  2. STEF encoder and decoder keep dictionaries in memory. If we split into multiple STEF streams all of these encoders and decoders will eventually contain virtually identical data, duplicated as many times as you have STEF streams, increasing memory usage by a factor of the number of streams. This is bad, especially for backends which expect millions of incoming streams.

Sync API of Exporter Helper

The current exporter helper design requires exportMetrics() call to synchronously block until sent data is confirmed to be delivered to the destination via ACK messages that destination sends back on the same gRPC stream. When exportMetrics() returns, the metric data is removed from the queue (and is garbage collected). If we change exportMetrics() to return before data is confirmed to be delivered to the destination, without waiting for ACK messages, then there is a chance that the data will be lost if the gRPC connection breaks before STEF data is actually delivered to the destination.

Furthermore if exportMetrics() were to return immediately after encoding STEF data, that data most likely will not be written to the gRPC stream at all, since STEF encoders buffer data into fairly large frames before being written to gRPC stream. To guarantee encoded data is sent over the gRPC stream exportMetrics() has to issue a Flush() call to STEF encoder. If this is done for every single exportMetrics() call this can result in very significant reduction in compression ratio since there is typically a fixed overhead per STEF frame (Flush() sends and creates a new frame). The difference in experiments is about 2x times worse compression if you do Flush() every time (on the datasets I have). This is unacceptable and defeats the purpose of STEF. Note, as described above, even issuing Flush() call every time does not guarantee delivery, so this is still not good enough for reliable delivery.

It would be ideal if exporter helper design decoupled the act of consuming from the queue from the act of deleting from the queue. This would be perfect for asynchronous protocols like STEF. For example a hypothetical async exporter design could look like this:

func exportMetrics(ctx context.Context, md pdata.Metrics, ack func(id SomeIDType)) (id SomeIDType, err error)

With this API we would then implement STEF exporter's exportMetrics() call to return immediately after encoding md into STEF stream and return the id of the written record. STEF exporter later would asynchronously call the ack() func when it receives delivery confirmation from the destination. This would also allow to have much, much simpler implementation of STEF exporter, I would delete 90% of the code that you (and I) don't like.

I have briefly discussed this topic with @dmitryax but I think this is a much bigger effort and for now we have to work with the exporter helper API we have.

If you have any thoughts on how to simplify the design within the current constraints or if you think there is a better way to handle asynchronous sending please tell.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants