Skip to content

Commit

Permalink
Avoid to populate version and version type attributes when processing…
Browse files Browse the repository at this point in the history
… integration metadata and datastream is enabled (#1161)

During PR #1155 the resolution of version and version_type ES parameters was moved from the index only event action tuple creation to the common method. This changed was due to do the intent to collect all integration-aware metadata fields in one place, but the common method is used also by the datastream part this result in populating event and event_type request parameters not only for normal index operations.
During an index operation on a datastream, if one of those parameters is valued, generated an error on ES indexing, resulting in request fail.

This PR move the processing and creation of event and event_type parameters, back in its original position, splitting the capture of integration metadata in 2 parts.
  • Loading branch information
andsel authored Nov 21, 2023
1 parent cfd82fa commit 77ca162
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 12 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.22.1
- Fix, avoid to populate `version` and `version_type` attributes when processing integration metadata and datastream is enabled. [#1161](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1161)

## 11.22.0
- Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `version`, `version_type`, or `routing` directives [#1158](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1158)

Expand Down
18 changes: 11 additions & 7 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,16 @@ def event_action_tuple(event)
params[retry_on_conflict_action_name] = @retry_on_conflict
end

event_control = event.get("[@metadata][_ingest_document]")
event_version, event_version_type = event_control&.values_at("version", "version_type") rescue nil

resolved_version = resolve_version(event, event_version)
resolved_version_type = resolve_version_type(event, event_version_type)

# avoid to add nil valued key-value pairs
params[:version] = resolved_version unless resolved_version.nil?
params[:version_type] = resolved_version_type unless resolved_version_type.nil?

EventActionTuple.new(action, params, event)
end

Expand Down Expand Up @@ -538,7 +548,7 @@ def initialize(bad_action)
# @private shared event params factory between index and data_stream mode
def common_event_params(event)
event_control = event.get("[@metadata][_ingest_document]")
event_id, event_pipeline, event_index, event_routing, event_version, event_version_type = event_control&.values_at("id","pipeline","index", "routing", "version", "version_type") rescue nil
event_id, event_pipeline, event_index, event_routing = event_control&.values_at("id","pipeline","index", "routing") rescue nil

params = {
:_id => resolve_document_id(event, event_id),
Expand All @@ -554,12 +564,6 @@ def common_event_params(event)
# }
params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?)

resolved_version = resolve_version(event, event_version)
resolved_version_type = resolve_version_type(event, event_version_type)
# avoid to add nil valued key-value pairs
params[:version] = resolved_version unless resolved_version.nil?
params[:version_type] = resolved_version_type unless resolved_version_type.nil?

params
end

Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.22.0'
s.version = '11.22.1'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
30 changes: 26 additions & 4 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,19 @@
context "when the event contains an integration metadata version" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version" => "456"}}}) }

it "event's metadata version is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "456")
context "when datastream settings are NOT configured" do
it "event's metadata version is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "456")
end
end

context "when datastream settings are configured" do
# NOTE: we validate with datastream-specific `data_stream_event_action_tuple`
let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) }

it "no version is used" do
expect(subject.send(:data_stream_event_action_tuple, event)[1]).to_not include(:version)
end
end
end

Expand All @@ -315,8 +326,19 @@
context "when the event contains an integration metadata version_type" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version_type" => "external"}}}) }

it "plugin's version_type is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal")
context "when datastream settings are NOT configured" do
it "plugin's version_type is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal")
end
end

context "when datastream settings are configured" do
# NOTE: we validate with datastream-specific `data_stream_event_action_tuple`
let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) }

it "no version_type is used" do
expect(subject.send(:data_stream_event_action_tuple, event)[1]).to_not include(:version_type)
end
end
end

Expand Down

0 comments on commit 77ca162

Please sign in to comment.