Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(pubsub): Add ingestion from GCS samples #27434

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions google-cloud-pubsub/samples/acceptance/topics_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
require_relative "../pubsub_create_pull_subscription.rb"
require_relative "../pubsub_create_push_subscription.rb"
require_relative "../pubsub_create_topic.rb"
require_relative "../pubsub_create_topic_with_cloud_storage_ingestion.rb"
require_relative "../pubsub_dead_letter_create_subscription.rb"
require_relative "../pubsub_dead_letter_delivery_attempt.rb"
require_relative "../pubsub_dead_letter_remove.rb"
Expand All @@ -40,15 +41,18 @@
let(:role) { "roles/pubsub.publisher" }
let(:service_account_email) { "serviceAccount:kokoro@#{pubsub.project}.iam.gserviceaccount.com" }
let(:topic_id) { random_topic_id }
let(:cloud_storage_ingestion_topic_id) { random_topic_id }
let(:cloud_storage_bucket) { "pubsub-ruby-sample-bucket" }
let(:subscription_id) { random_subscription_id }
let(:dead_letter_topic_id) { random_topic_id }

after do
@subscription.delete if @subscription
@topic.delete if @topic
@cloud_storage_ingestion_topic.delete if @cloud_storage_ingestion_topic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where this instance variable gets set. Did you intend to set it in the test below (i.e. instead of just setting a local variable in line 74)?

end

it "supports pubsub_create_topic, pubsub_list_topics, pubsub_set_topic_policy, pubsub_get_topic_policy, pubsub_test_topic_permissions, pubsub_delete_topic" do
it "supports pubsub_create_topic, pubsub_create_topic_with_cloud_storage_ingestion, pubsub_list_topics, pubsub_set_topic_policy, pubsub_get_topic_policy, pubsub_test_topic_permissions, pubsub_delete_topic" do
# pubsub_create_topic
assert_output "Topic projects/#{pubsub.project}/topics/#{topic_id} created.\n" do
create_topic topic_id: topic_id
Expand All @@ -57,6 +61,20 @@
assert topic
assert_equal "projects/#{pubsub.project}/topics/#{topic_id}", topic.name

# pubsub_create_topic_with_cloud_storage_ingestion
# This test requires the existence of GCS bucket gs://pubsub-ruby-sample-bucket in `pubsub.project` with the P4SA for `pubsub.project` having the "storage.admin" role on the GCS bucket.
assert_output "Topic projects/#{pubsub.project}/topics/#{cloud_storage_ingestion_topic_id} with Cloud Storage ingestion settings created.\n" do
create_topic_with_cloud_storage_ingestion topic_id: cloud_storage_ingestion_topic_id,
bucket: cloud_storage_bucket,
input_format: "text",
text_delimiter: ",",
match_glob: "**.txt",
minimum_object_create_time: "1970-01-01T00:00:01Z"
end
cloud_storage_ingestion_topic = pubsub.topic cloud_storage_ingestion_topic_id
assert cloud_storage_ingestion_topic
assert_equal "projects/#{pubsub.project}/topics/#{cloud_storage_ingestion_topic_id}", cloud_storage_ingestion_topic.name

# pubsub_list_topics
out, _err = capture_io do
list_topics
Expand Down Expand Up @@ -196,7 +214,7 @@
#setup
@topic = pubsub.create_topic topic_id
@dead_letter_topic = pubsub.create_topic dead_letter_topic_id

begin
# pubsub_dead_letter_create_subscription
out, _err = capture_io do
Expand Down Expand Up @@ -239,7 +257,6 @@
@subscription.reload!
refute @subscription.dead_letter_topic
refute @subscription.dead_letter_max_delivery_attempts

ensure
@dead_letter_topic.delete
end
Expand Down Expand Up @@ -392,14 +409,14 @@

# Pub/Sub calls may not respond immediately.
# Wrap expectations that may require multiple attempts with this method.
def expect_with_retry sample_name, attempts: 5
def expect_with_retry(sample_name, attempts: 5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need to add these parens. Our style prefers omitting them.

@attempt_number ||= 0
yield
@attempt_number = nil
rescue Minitest::Assertion => e
@attempt_number += 1
puts "failed attempt #{@attempt_number} for #{sample_name}"
sleep @attempt_number*@attempt_number
sleep @attempt_number * @attempt_number
retry if @attempt_number < attempts
@attempt_number = nil
raise e
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2024 Google, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require "google/cloud/pubsub"

# rubocop:disable Metrics/MethodLength

def create_topic_with_cloud_storage_ingestion topic_id:,
bucket:,
input_format:,
text_delimiter:,
match_glob:,
minimum_object_create_time:
# [START pubsub_create_topic_with_cloud_storage_ingestion]
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text" (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

pubsub = Google::Cloud::Pubsub.new
cloud_storage_settings = Google::Cloud::PubSub::V1::IngestionDataSourceSettings::CloudStorage.new(
bucket: bucket,
match_glob: match_glob
)
case input_format
when "text"
cloud_storage_settings.text_format =
Google::Cloud::PubSub::V1::IngestionDataSourceSettings::CloudStorage::TextFormat.new(
delimiter: text_delimiter
)
when "avro"
cloud_storage_settings.avro_format =
Google::Cloud::PubSub::V1::IngestionDataSourceSettings::CloudStorage::AvroFormat.new
when "pubsub_avro"
cloud_storage_settings.pubsub_avro_format =
Google::Cloud::PubSub::V1::IngestionDataSourceSettings::CloudStorage::PubSubAvroFormat.new
else
puts "Invalid input format: #{input_format}; must be in ('text', 'avro', 'pubsub_avro')"
return
end
unless minimum_object_create_time.empty?
cloud_storage_settings.minimum_object_create_time = Time.parse minimum_object_create_time
end
ingestion_data_source_settings = Google::Cloud::PubSub::V1::IngestionDataSourceSettings.new(
cloud_storage: cloud_storage_settings
)
topic = pubsub.create_topic topic_id, ingestion_data_source_settings: ingestion_data_source_settings
puts "Topic #{topic.name} with Cloud Storage ingestion settings created."
# [END pubsub_create_topic_with_cloud_storage_ingestion]
end

# rubocop:enable Metrics/MethodLength