Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/googlecloudpubsub] add support for pubsub ordering #37587

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

kevinnoel-be
Copy link
Contributor

Description

Add support for publishing ordered OTLP data to GCP PubSub.

Link to tracking issue

Fixes #32850

Testing

Unit tests updated.

Local testing for validating the ordering is properly enabled:

Details

GCP resources created:

gcloud pubsub topics create topic-for-testing-c9dbe8e5
gcloud pubsub subscriptions create subscription-test-ordered-c9dbe8e5 --topic topic-for-testing-c9dbe8e5 --topic-project xxx --expiration-period 1d --message-retention-duration 10m --enable-message-ordering

OTel collector config used:

receivers:
  filelog/1:
    include:
      - /xxx/logs-1.jsonl
    start_at: beginning
    resource:
      file: logs-1
  filelog/2:
    include:
      - /xxx/logs-2.jsonl
    start_at: beginning
    resource:
      file: logs-2

  googlecloudpubsub/1:
    project: xxx
    subscription: projects/xxx/subscriptions/subsription-test-ordered-c9dbe8e5
    client_id: "local-test-1"

  googlecloudpubsub/2:
    project: xxx
    subscription: projects/xxx/subscriptions/subsription-test-ordered-c9dbe8e5
    client_id: "local-test-2"

processors:
  batch:

exporters:
  debug/exported:
    verbosity: basic

  debug/received:
    verbosity: basic

  googlecloudpubsub:
    project: xxx
    topic: projects/xxx/topics/topic-for-testing-c9dbe8e5
    ordering:
      enabled: true
      from_resource_attribute: file
      remove_resource_attribute: false

  file/output_1:
    path: /xxx/output/logs-1.jsonl

  file/output_2:
    path: /xxx/output/logs-2.jsonl

service:
  telemetry:
    logs:
      level: info
    metrics:
      address: "localhost:8888"

  pipelines:
    logs/exporter:
      receivers:
        - filelog/1
        - filelog/2
      processors:
        - batch
      exporters:
        - debug/exported
        - googlecloudpubsub

    logs/receiver_1:
      receivers:
        - googlecloudpubsub/1
      processors:
      exporters:
        - debug/received
        - file/output_1

    logs/receiver_2:
      receivers:
        - googlecloudpubsub/2
      processors:
      exporters:
        - debug/received
        - file/output_2

With input test data:

wc -l *.jsonl
 10000 logs-1.jsonl
 10000 logs-2.jsonl
 20000 total

Run OTel logs:

make RUN_CONFIG=.mylocal/config-pubsub-ordered.yaml run
cd ./cmd/otelcontribcol && GO111MODULE=on go run --race . --config ../../config-pubsub-ordered.yaml 
2025-01-30T11:22:27.610+0100    info    [email protected]/service.go:186   Setting up own telemetry...
2025-01-30T11:22:27.610+0100    warn    [email protected]/service.go:235   service::telemetry::metrics::address is being deprecated in favor of service::telemetry::metrics::readers
2025-01-30T11:22:27.611+0100    info    builders/builders.go:26 Development component. May change in the future.        {"kind": "exporter", "data_type": "logs", "name": "debug/received"}
2025-01-30T11:22:27.621+0100    info    builders/builders.go:26 Development component. May change in the future.        {"kind": "exporter", "data_type": "logs", "name": "debug/exported"}
2025-01-30T11:22:27.624+0100    info    [email protected]/service.go:252   Starting otelcontribcol...      {"Version": "0.118.0-dev", "NumCPU": 7}
2025-01-30T11:22:27.625+0100    info    extensions/extensions.go:39     Starting extensions...
2025-01-30T11:22:28.117+0100    info    adapter/receiver.go:41  Starting stanza receiver        {"kind": "receiver", "name": "filelog/2", "data_type": "logs"}
2025-01-30T11:22:28.117+0100    info    internal/handler.go:105 Starting Streaming Pull {"kind": "receiver", "name": "googlecloudpubsub/1", "data_type": "logs"}
2025-01-30T11:22:28.317+0100    info    fileconsumer/file.go:265        Started watching file   {"kind": "receiver", "name": "filelog/2", "data_type": "logs", "component": "fileconsumer", "path": "/xxx/logs-2.jsonl"}
2025-01-30T11:22:28.343+0100    info    adapter/receiver.go:41  Starting stanza receiver        {"kind": "receiver", "name": "filelog/1", "data_type": "logs"}
2025-01-30T11:22:28.343+0100    info    internal/handler.go:105 Starting Streaming Pull {"kind": "receiver", "name": "googlecloudpubsub/2", "data_type": "logs"}
2025-01-30T11:22:28.344+0100    info    [email protected]/service.go:275   Everything is ready. Begin running and processing data.
2025-01-30T11:22:28.546+0100    info    fileconsumer/file.go:265        Started watching file   {"kind": "receiver", "name": "filelog/1", "data_type": "logs", "component": "fileconsumer", "path": "/xxx/logs-1.jsonl"}
2025-01-30T11:22:29.470+0100    info    Logs    {"kind": "exporter", "data_type": "logs", "name": "debug/exported", "resource logs": 82, "log records": 8200}
2025-01-30T11:22:30.390+0100    info    Logs    {"kind": "exporter", "data_type": "logs", "name": "debug/exported", "resource logs": 82, "log records": 8200}
2025-01-30T11:22:30.799+0100    info    Logs    {"kind": "exporter", "data_type": "logs", "name": "debug/exported", "resource logs": 36, "log records": 3600}
2025-01-30T11:22:31.240+0100    info    Logs    {"kind": "exporter", "data_type": "logs", "name": "debug/received", "resource logs": 82, "log records": 8200}
2025-01-30T11:22:31.559+0100    info    Logs    {"kind": "exporter", "data_type": "logs", "name": "debug/received", "resource logs": 64, "log records": 6400}
2025-01-30T11:22:38.277+0100    info    Logs    {"kind": "exporter", "data_type": "logs", "name": "debug/received", "resource logs": 36, "log records": 3600}
2025-01-30T11:22:38.564+0100    info    Logs    {"kind": "exporter", "data_type": "logs", "name": "debug/received", "resource logs": 18, "log records": 1800}
^C2025-01-30T11:23:10.705+0100  info    [email protected]/collector.go:331 Received signal from OS {"signal": "interrupt"}
2025-01-30T11:23:10.705+0100    info    [email protected]/service.go:317   Starting shutdown...

The numbers adds up and are ordered:

jq '.resourceLogs[].scopeLogs[].logRecords[].body | select(has("stringValue")) | .stringValue' output/*.jsonl | wc -l    
20000jq '.resourceLogs[].scopeLogs[].logRecords[].body | select(has("stringValue")) | .stringValue' output/*.jsonl | grep logs-1 | (head -n5; tail -n5)
"message 50000 for logs-1.jsonl"
"message 50001 for logs-1.jsonl"
"message 50002 for logs-1.jsonl"
"message 50003 for logs-1.jsonl"
"message 50004 for logs-1.jsonl"
"message 59995 for logs-1.jsonl"
"message 59996 for logs-1.jsonl"
"message 59997 for logs-1.jsonl"
"message 59998 for logs-1.jsonl"
"message 59999 for logs-1.jsonl"jq '.resourceLogs[].scopeLogs[].logRecords[].body | select(has("stringValue")) | .stringValue' output/*.jsonl | grep logs-2 | (head -n5; tail -n5)
"message 50000 for logs-2.jsonl"
"message 50001 for logs-2.jsonl"
"message 50002 for logs-2.jsonl"
"message 50003 for logs-2.jsonl"
"message 50004 for logs-2.jsonl"
"message 59995 for logs-2.jsonl"
"message 59996 for logs-2.jsonl"
"message 59997 for logs-2.jsonl"
"message 59998 for logs-2.jsonl"
"message 59999 for logs-2.jsonl"

Documentation

Updated the README with new configurations + some fixes

@kevinnoel-be kevinnoel-be changed the title Add pubsub ordering [exporter/googlecloudpubsub] add support for pubsub ordering Jan 30, 2025
Copy link
Contributor

@alexvanboxel alexvanboxel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Code owner I'm approving without comment. This PR has gone through an internal review already.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Supported ordering key in googlecloudpubsub exporter
3 participants