Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza] Introduce batching logs in File consumer #36663

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
6b4c9fe
refactor: introduce batching in File consumer's Reader
andrzej-stencel Nov 20, 2024
c206995
change `emit.Callback` signature to accept a slice of tokens
andrzej-stencel Nov 20, 2024
aedda3a
add `WriterOperator::WriteBatch` and `Operator::ProcessBatch` methods
andrzej-stencel Nov 22, 2024
477f67b
change signature of ProcessBatch to accept `[]*entry.Entry` instead o…
andrzej-stencel Dec 3, 2024
13d6054
change LogEmitter to process entries in batches
andrzej-stencel Dec 3, 2024
dda1a60
add changelog entry for `ProcessBatch` method
andrzej-stencel Dec 4, 2024
961789e
refactor: move EOF handling code inside the loop
andrzej-stencel Dec 6, 2024
07e0465
test: only check last token in slice
andrzej-stencel Dec 6, 2024
f650804
Merge remote-tracking branch 'origin/main' into batch-logs-in-file-co…
andrzej-stencel Dec 16, 2024
de66483
perf: inline `convertToken` function
andrzej-stencel Dec 16, 2024
d8704ab
refactor: introduce `Transformer.ProcessBatchWith` to reduce code dup…
andrzej-stencel Dec 16, 2024
5734fcc
refactor: use `Transformer.ProcessBatchWith` in parsers
andrzej-stencel Dec 16, 2024
1af6281
fix godoc typos
andrzej-stencel Dec 18, 2024
f6b44e0
Merge branch 'main' into batch-logs-in-file-consumer
andrzej-stencel Jan 7, 2025
4696c39
perf: reduce memory allocations in Reader
andrzej-stencel Jan 15, 2025
6d4dc42
perf: reduce memory allocations in Reader for attributes
andrzej-stencel Jan 16, 2025
64a5098
Merge branch 'main' into batch-logs-in-file-consumer
andrzej-stencel Jan 16, 2025
6bf73cc
Merge branch 'main' into batch-logs-in-file-consumer
andrzej-stencel Jan 24, 2025
b67c222
Merge remote-tracking branch 'upstream/main' into batch-logs-in-file-…
andrzej-stencel Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/batch-logs-in-file-consumer-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add method `ProcessBatch` to `Operator` interface in `pkg/stanza/operator` package to support batch processing.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35455]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
27 changes: 27 additions & 0 deletions .chloggen/batch-logs-in-file-consumer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change signature of `emit.Callback` function in `pkg/stanza/fileconsumer/emit` package to emit multiple tokens.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35455]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
4 changes: 4 additions & 0 deletions pkg/stanza/adapter/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (o *UnstartableOperator) Start(_ operator.Persister) error {
return errors.New("something very unusual happened")
}

func (o *UnstartableOperator) ProcessBatch(_ context.Context, _ []*entry.Entry) error {
return nil
}

// Process will return nil
func (o *UnstartableOperator) Process(_ context.Context, _ *entry.Entry) error {
return nil
Expand Down
9 changes: 6 additions & 3 deletions pkg/stanza/fileconsumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ func BenchmarkFileInput(b *testing.B) {
cfg.PollInterval = time.Microsecond

doneChan := make(chan bool, len(files))
callback := func(_ context.Context, token emit.Token) error {
if len(token.Body) == 0 {
doneChan <- true
callback := func(_ context.Context, tokens []emit.Token) error {
for _, token := range tokens {
if len(token.Body) == 0 {
doneChan <- true
break
}
andrzej-stencel marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/emit/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"context"
)

type Callback func(ctx context.Context, token Token) error
type Callback func(ctx context.Context, tokens []Token) error

type Token struct {
Body []byte
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/emittest/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
)

func Nop(_ context.Context, _ emit.Token) error {
func Nop(_ context.Context, _ []emit.Token) error {
return nil
}
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/emittest/nop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ import (
)

func TestNop(t *testing.T) {
require.NoError(t, Nop(context.Background(), emit.Token{}))
require.NoError(t, Nop(context.Background(), []emit.Token{}))
}
16 changes: 9 additions & 7 deletions pkg/stanza/fileconsumer/internal/emittest/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ func NewSink(opts ...SinkOpt) *Sink {
return &Sink{
emitChan: emitChan,
timeout: cfg.timeout,
Callback: func(ctx context.Context, token emit.Token) error {
copied := make([]byte, len(token.Body))
copy(copied, token.Body)
select {
case <-ctx.Done():
return ctx.Err()
case emitChan <- &Call{copied, token.Attributes}:
Callback: func(ctx context.Context, tokens []emit.Token) error {
for _, token := range tokens {
copied := make([]byte, len(token.Body))
copy(copied, token.Body)
select {
case <-ctx.Done():
return ctx.Err()
case emitChan <- &Call{copied, token.Attributes}:
}
}
return nil
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/emittest/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) {
}
go func() {
for _, c := range testCalls {
assert.NoError(t, s.Callback(context.Background(), emit.NewToken(c.Token, c.Attrs)))
assert.NoError(t, s.Callback(context.Background(), []emit.Token{emit.NewToken(c.Token, c.Attrs)}))
}
}()
return s, testCalls
Expand Down
7 changes: 7 additions & 0 deletions pkg/stanza/fileconsumer/internal/header/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ func newPipelineOutput(set component.TelemetrySettings) *pipelineOutput {
}
}

func (e *pipelineOutput) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
for i := range entries {
_ = e.Process(ctx, entries[i])
}
return nil
}

// Drop the entry if logChan is full, in order to avoid this operator blocking.
// This protects against a case where an operator could return an error, but continue propagating a log entry,
// leaving an unexpected entry in the output channel.
Expand Down
6 changes: 4 additions & 2 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (
)

const (
DefaultMaxLogSize = 1024 * 1024
DefaultFlushPeriod = 500 * time.Millisecond
DefaultMaxLogSize = 1024 * 1024
DefaultFlushPeriod = 500 * time.Millisecond
DefaultMaxBatchSize = 100
)

type Factory struct {
Expand Down Expand Up @@ -77,6 +78,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
includeFileRecordNum: f.IncludeFileRecordNumber,
compression: f.Compression,
acquireFSLock: f.AcquireFSLock,
maxBatchSize: DefaultMaxBatchSize,
}
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))

Expand Down
43 changes: 39 additions & 4 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Reader struct {
includeFileRecordNum bool
compression string
acquireFSLock bool
maxBatchSize int
}

// ReadToEnd will read until the end of the file
Expand Down Expand Up @@ -179,6 +180,8 @@ func (r *Reader) readContents(ctx context.Context) {
// Create the scanner to read the contents of the file.
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.contentSplitFunc)

tokens := make([]emit.Token, 0, r.maxBatchSize)

// Iterate over the contents of the file.
for {
select {
Expand All @@ -194,7 +197,7 @@ func (r *Reader) readContents(ctx context.Context) {
} else if r.deleteAtEOF {
r.delete()
}
return
break
}

token, err := r.decoder.Decode(s.Bytes())
Expand All @@ -209,15 +212,47 @@ func (r *Reader) readContents(ctx context.Context) {
r.FileAttributes[attrs.LogFileRecordNumber] = r.RecordNum
}

err = r.emitFunc(ctx, emit.NewToken(token, r.FileAttributes))
if err != nil {
r.set.Logger.Error("failed to process token", zap.Error(err))
tokens = append(tokens, emit.NewToken(copyBody(token), copyAttributes(r.FileAttributes)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What specifically introduces a need to make copies of the body and attributes?

It seems like a lot of overhead - are we sure it's necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without copying, for each token in the tokens collection, its Body would point to the same place in memory, specifically the Decoder's decodeBuffer.

This isn't a problem when the Reader emits each token before creating a new token, as the token's Body is copied in the emit function (specifically in the toBody function - the conversion from []byte to string copies the underlying array).

Similar with the attributes, the reader::FileAttributes map would be reused by all tokens if not copied.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding correctly then, we're just making the copies in a different place, so this does not introduce new work into the hot path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid it does introduce new work.

How I believe this worked before this change:

  • Every time a new token is read from the file, it's placed in the same area in memory - the Decoder's decodeBuffer.
  • A pointer to that place in memory is passed to the emit function
  • The emit function reads the bytes and creates a new Entry, copying the bytes into a new area in memory.

How this works after this change:

  • Every time a new token is read from the file, it's placed in the same area in memory - the Decoder's decodeBuffer.
  • In the Reader's readContent function, a batch of tokens is accumulated before calling the emit function, so the bytes from decodeBuffer need to be copied into a new area in memory for every new token
  • the accumulated slice of tokens is passed to the emit function, which creates a new Entry for every Token, copying the bytes for the second time.


if r.maxBatchSize > 0 && len(tokens) >= r.maxBatchSize {
err := r.emitFunc(ctx, tokens)
if err != nil {
r.set.Logger.Error("failed to emit token", zap.Error(err))
}
tokens = tokens[:0]
r.Offset = s.Pos()
}
}

if len(tokens) > 0 {
err := r.emitFunc(ctx, tokens)
if err != nil {
r.set.Logger.Error("failed to emit token", zap.Error(err))
}
andrzej-stencel marked this conversation as resolved.
Show resolved Hide resolved
r.Offset = s.Pos()
}
}

func copyBody(body []byte) []byte {
if body == nil {
return nil
}
copied := make([]byte, len(body))
copy(copied, body)
return copied
}

func copyAttributes(attrs map[string]any) map[string]any {
if attrs == nil {
return nil
}
copied := make(map[string]any, len(attrs))
for k, v := range attrs {
copied[k] = v
}
return copied
}

// Delete will close and delete the file
func (r *Reader) delete() {
r.close()
Expand Down
25 changes: 25 additions & 0 deletions pkg/stanza/operator/helper/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,31 @@ func (e *LogEmitter) Stop() error {
return nil
}

// ProcessBatch emits the entries to the consumerFunc
func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
if oldBatch := e.appendEntries(entries); len(oldBatch) > 0 {
e.consumerFunc(ctx, oldBatch)
}

return nil
}

// appendEntry appends the entry to the current batch. If maxBatchSize is reached, a new batch will be made, and the old batch
andrzej-stencel marked this conversation as resolved.
Show resolved Hide resolved
// (which should be flushed) will be returned
func (e *LogEmitter) appendEntries(entries []*entry.Entry) []*entry.Entry {
e.batchMux.Lock()
defer e.batchMux.Unlock()

e.batch = append(e.batch, entries...)
if uint(len(e.batch)) >= e.maxBatchSize {
var oldBatch []*entry.Entry
oldBatch, e.batch = e.batch, make([]*entry.Entry, 0, e.maxBatchSize)
return oldBatch
}

return nil
}

// Process will emit an entry to the output channel
func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error {
if oldBatch := e.appendEntry(ent); len(oldBatch) > 0 {
Expand Down
9 changes: 9 additions & 0 deletions pkg/stanza/operator/helper/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ func (i *InputOperator) CanProcess() bool {
return false
}

// ProcessBatch will always return an error if called.
func (i *InputOperator) ProcessBatch(_ context.Context, _ []*entry.Entry) error {
i.Logger().Error("Operator received a batch of entries, but can not process")
return errors.NewError(
"Operator can not process logs.",
"Ensure that operator is not configured to receive logs from other operators",
)
}

// Process will always return an error if called.
func (i *InputOperator) Process(_ context.Context, _ *entry.Entry) error {
i.Logger().Error("Operator received an entry, but can not process")
Expand Down
19 changes: 19 additions & 0 deletions pkg/stanza/operator/helper/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ type WriterOperator struct {
OutputOperators []operator.Operator
}

// Write writes a batch of entries to the outputs of the operator.
andrzej-stencel marked this conversation as resolved.
Show resolved Hide resolved
// A batch is a collection of entries that are sent in one go.
func (w *WriterOperator) WriteBatch(ctx context.Context, entries []*entry.Entry) error {
for i, op := range w.OutputOperators {
if i == len(w.OutputOperators)-1 {
return op.ProcessBatch(ctx, entries)
}
copyOfEntries := make([]*entry.Entry, 0, len(entries))
for i := range entries {
copyOfEntries = append(copyOfEntries, entries[i].Copy())
}
err := op.ProcessBatch(ctx, copyOfEntries)
if err != nil {
w.Logger().Error("Failed to process entries", zap.Error(err))
}
}
return nil
}

// Write will write an entry to the outputs of the operator.
func (w *WriterOperator) Write(ctx context.Context, e *entry.Entry) error {
for i, op := range w.OutputOperators {
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/operator/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error
toBody: toBody,
}

input.fileConsumer, err = c.Config.Build(set, input.emit)
input.fileConsumer, err = c.Config.Build(set, input.emitBatch)
if err != nil {
return nil, err
}
Expand Down
38 changes: 33 additions & 5 deletions pkg/stanza/operator/input/file/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package file // import "github.com/open-telemetry/opentelemetry-collector-contri

import (
"context"
"errors"
"fmt"

"go.uber.org/zap"
Expand Down Expand Up @@ -37,20 +38,47 @@ func (i *Input) Stop() error {
return i.fileConsumer.Stop()
}

func (i *Input) emit(ctx context.Context, token emit.Token) error {
if len(token.Body) == 0 {
return nil
func (i *Input) emitBatch(ctx context.Context, tokens []emit.Token) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is in the hot path, we might be better off flattening these functions. It seems like a lot of extra error checking and wrapping, where we really just need to put a loop around what was already there, switch from return to append err + continue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have inlined the convertToken function in de66483. Should I inline the convertTokens function as well?

entries, conversionError := i.convertTokens(tokens)
if conversionError != nil {
conversionError = fmt.Errorf("convert tokens: %w", conversionError)
}

consumeError := i.WriteBatch(ctx, entries)
if consumeError != nil {
consumeError = fmt.Errorf("consume entries: %w", consumeError)
}

return errors.Join(conversionError, consumeError)
}

func (i *Input) convertTokens(tokens []emit.Token) ([]*entry.Entry, error) {
entries := make([]*entry.Entry, 0, len(tokens))
var errs []error
for _, token := range tokens {
if len(token.Body) == 0 {
continue
}
entry, err := i.convertToken(token)
if err != nil {
errs = append(errs, err)
continue
}
entries = append(entries, entry)
}
return entries, errors.Join(errs...)
}

func (i *Input) convertToken(token emit.Token) (*entry.Entry, error) {
ent, err := i.NewEntry(i.toBody(token.Body))
if err != nil {
return fmt.Errorf("create entry: %w", err)
return nil, fmt.Errorf("create entry: %w", err)
}

for k, v := range token.Attributes {
if err := ent.Set(entry.NewAttributeField(k), v); err != nil {
i.Logger().Error("set attribute", zap.Error(err))
}
}
return i.Write(ctx, ent)
return ent, nil
}
Loading
Loading