Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/googlecloudpubsub] add support for pubsub ordering #37587

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .chloggen/add-pubsub-ordering.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
change_type: enhancement
component: googlecloudpubsubexporter
note: Add support for exporting ordered messages to GCP Pub/Sub
issues: [32850]
52 changes: 46 additions & 6 deletions exporter/googlecloudpubsubexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ This exporter sends OTLP messages to a Google Cloud [Pubsub](https://cloud.googl
The following configuration options are supported:

* `project` (Optional): The Google Cloud Project of the topics.
* `topic` (Required): The topic name to receive OTLP data over. The topic name should be a fully qualified resource
* `topic` (Required): The topic name to send OTLP data over. The topic name should be a fully qualified resource
name (eg: `projects/otel-project/topics/otlp`).
* `compression` (Optional): Set the payload compression, only `gzip` is supported. Default is no compression.
* `watermark` Behaviour of how the `ce-time` attribute is set (see watermark section for more info)
Expand All @@ -31,17 +31,25 @@ The following configuration options are supported:
or switching between [global and regional service endpoints](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints).
* `insecure` (Optional): allows performing “insecure” SSL connections and transfers, useful when connecting to a local
emulator instance. Only has effect if Endpoint is not ""
* `ordering`: Configures the [PubSub ordering](https://cloud.google.com/pubsub/docs/ordering) feature, see
[ordering](#ordering) section for more info.
* `enabled` (default = `false`): Enables the ordering. Default is disabled.
* `from_resource_attribute` (no default): resource attribute that will be used as the ordering key. Required when
`ordering.enabled` is `true`. If the resource attribute is missing or has an empty value, the messages will not be
ordered for this resource.
* `remove_resource_attribute` (default = `false`): if the ordering key resource attribute specified
`from_resource_attribute` should be removed from the resource attributes.

```yaml
exporters:
googlecloudpubsub:
project: my-project
topic: otlp-traces
topic: projects/my-project/topics/otlp-traces
```

## Pubsub topic

The Google Cloud [Pubsub](https://cloud.google.com/pubsub) export doesn't automatic create topics, it expects the topic
The Google Cloud [Pubsub](https://cloud.google.com/pubsub) exporter doesn't automatically create topics, it expects the topic
to be created upfront. Security wise it's best to give the collector its own service account and give the
topic `Pub/Sub Publisher` permission.

Expand Down Expand Up @@ -74,11 +82,11 @@ up to 20% of the cost. This can be done by setting the `compression` to `gzip`.
exporters:
googlecloudpubsub:
project: my-project
topic: otlp-traces
topic: projects/my-project/topics/otlp-traces
compression: gzip
```

The exporter with add the `content-encoding` attribute to the message. The receiver will look at this attribute
The exporter will add the `content-encoding` attribute to the message. The receiver will look at this attribute
to detect the compression that is used on the payload.

Only `gzip` is supported.
Expand All @@ -100,7 +108,7 @@ timestamp , if you want to behaviour to have effect as the default is `0s`.
exporters:
googlecloudpubsub:
project: my-project
topic: otlp-traces
topic: projects/my-project/topics/otlp-traces
watermark:
behavior: earliest
allow_drift: 1h
Expand All @@ -119,3 +127,35 @@ scenario is `behavior: earliest` with a reasonable `allow_drift` of `1h`.

Allowed behavior values are `current` or `earliest`. For `allow_drift` the default is `0s`, so make sure to set the
value.

## Ordering

When ordering is enabled (`ordering.enabled`), you are required to specify a resource attribute key that will be used as
the ordering key (`ordering.from_resource_attribute`). If this resource attribute is only meant to be used as an
ordering key, you may want to choose to get this resource attribute key (`ordering.from_resource_attribute`) removed
before publishing to PubSub by enabling the `ordering.remove_resource_attribute` configuration.

```yaml
exporters:
googlecloudpubsub:
project: my-project
topic: projects/my-project/topics/otlp-traces
ordering:
enabled: true
from_resource_attribute: some.resource.attribute.key
remove_resource_attribute: true
```

### Notes

While the PubSub topic doesn't require any configuration for ordering, you will need to enable ordering on your
subscription(s) if you need it. Enabling ordering on a subscription is only possible at creation.
For composite ordering keys you'd need to compose the resource attribute value before exporting e.g., by using a
[transform processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor)
.

Empty values in the ordering key are accepted but won't be ordered, see [PubSub ordering documentation](https://cloud.google.com/pubsub/docs/ordering)
for more details.

PubSub requires one publish request per ordering key value, so this exporter groups the signals per ordering key before
publishing.
32 changes: 27 additions & 5 deletions exporter/googlecloudpubsubexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/multierr"
)

var topicMatcher = regexp.MustCompile(`^projects/[a-z][a-z0-9\-]*/topics/`)
Expand All @@ -34,6 +35,8 @@ type Config struct {
Compression string `mapstructure:"compression"`
// Watermark defines the watermark (the ce-time attribute on the message) behavior
Watermark WatermarkConfig `mapstructure:"watermark"`
// Ordering configures the ordering keys
Ordering OrderingConfig `mapstructure:"ordering"`
}

// WatermarkConfig customizes the behavior of the watermark
Expand All @@ -46,15 +49,27 @@ type WatermarkConfig struct {
AllowedDrift time.Duration `mapstructure:"allowed_drift"`
}

// OrderingConfig customizes the behavior of the ordering
type OrderingConfig struct {
// Enabled indicates if ordering is enabled
Enabled bool `mapstructure:"enabled"`
// FromResourceAttribute is a resource attribute that will be used as the ordering key.
FromResourceAttribute string `mapstructure:"from_resource_attribute"`
// RemoveResourceAttribute indicates if the ordering key should be removed from the resource attributes.
RemoveResourceAttribute bool `mapstructure:"remove_resource_attribute"`
}

func (config *Config) Validate() error {
var errors error
if !topicMatcher.MatchString(config.Topic) {
return fmt.Errorf("topic '%s' is not a valid format, use 'projects/<project_id>/topics/<name>'", config.Topic)
errors = multierr.Append(errors, fmt.Errorf("topic '%s' is not a valid format, use 'projects/<project_id>/topics/<name>'", config.Topic))
}
_, err := config.parseCompression()
if err != nil {
return err
if _, err := config.parseCompression(); err != nil {
errors = multierr.Append(errors, err)
}
return config.Watermark.validate()
errors = multierr.Append(errors, config.Watermark.validate())
errors = multierr.Append(errors, config.Ordering.validate())
return errors
}

func (config *WatermarkConfig) validate() error {
Expand All @@ -65,6 +80,13 @@ func (config *WatermarkConfig) validate() error {
return err
}

func (cfg *OrderingConfig) validate() error {
if cfg.Enabled && cfg.FromResourceAttribute == "" {
return fmt.Errorf("'from_resource_attribute' is required if ordering is enabled")
}
return nil
}

func (config *Config) parseCompression() (compression, error) {
switch config.Compression {
case "gzip":
Expand Down
32 changes: 23 additions & 9 deletions exporter/googlecloudpubsubexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,21 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))

customConfig := factory.CreateDefaultConfig().(*Config)
expectedConfig := factory.CreateDefaultConfig().(*Config)

customConfig.ProjectID = "my-project"
customConfig.UserAgent = "opentelemetry-collector-contrib {{version}}"
customConfig.TimeoutSettings = exporterhelper.TimeoutConfig{
expectedConfig.ProjectID = "my-project"
expectedConfig.UserAgent = "opentelemetry-collector-contrib {{version}}"
expectedConfig.TimeoutSettings = exporterhelper.TimeoutConfig{
Timeout: 20 * time.Second,
}
customConfig.Topic = "projects/my-project/topics/otlp-topic"
customConfig.Compression = "gzip"
customConfig.Watermark.Behavior = "earliest"
customConfig.Watermark.AllowedDrift = time.Hour
assert.Equal(t, cfg, customConfig)
expectedConfig.Topic = "projects/my-project/topics/otlp-topic"
expectedConfig.Compression = "gzip"
expectedConfig.Watermark.Behavior = "earliest"
expectedConfig.Watermark.AllowedDrift = time.Hour
expectedConfig.Ordering.Enabled = true
expectedConfig.Ordering.FromResourceAttribute = "ordering_key"
expectedConfig.Ordering.RemoveResourceAttribute = true
assert.Equal(t, expectedConfig, cfg)
}

func TestTopicConfigValidation(t *testing.T) {
Expand Down Expand Up @@ -100,3 +103,14 @@ func TestWatermarkDefaultMaxDriftValidation(t *testing.T) {
assert.NoError(t, c.Validate())
assert.Equal(t, time.Duration(9223372036854775807), c.Watermark.AllowedDrift)
}

func TestOrderConfigValidation(t *testing.T) {
factory := NewFactory()
c := factory.CreateDefaultConfig().(*Config)
c.Topic = "projects/project/topics/my-topic"
assert.NoError(t, c.Validate())
c.Ordering.Enabled = true
assert.Error(t, c.Validate())
c.Ordering.FromResourceAttribute = "key"
assert.NoError(t, c.Validate())
}
Loading