diff --git a/sender_pool.go b/sender_pool.go index 2c80a75..45a4400 100644 --- a/sender_pool.go +++ b/sender_pool.go @@ -39,6 +39,7 @@ import ( type LineSenderPool struct { maxSenders int conf string + opts []LineSenderOption closed bool @@ -74,6 +75,41 @@ func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, e return pool, nil } +// PoolFromOptions instantiates a new LineSenderPool using programmatic options. +// Any sender acquired from this pool will be initialized with the same options +// that were passed into the opts argument. +// +// Unlike [PoolFromConf], PoolFromOptions does not have the ability to customize +// the returned LineSenderPool. In this case, to add options (such as [WithMaxSenders]), +// you need manually apply these options after calling this method. +// +// // Create a PoolFromOptions with LineSender options +// p, err := PoolFromOptions( +// WithHttp(), +// WithAutoFlushRows(1000000), +// ) +// +// if err != nil { +// panic(err) +// } +// +// // Add Pool-level options manually +// for _, opt := range []LineSenderPoolOption{ +// WithMaxSenders(32), +// } { +// opt(p) +// } +func PoolFromOptions(opts ...LineSenderOption) (*LineSenderPool, error) { + pool := &LineSenderPool{ + maxSenders: 64, + opts: opts, + senders: []LineSender{}, + mu: &sync.Mutex{}, + } + + return pool, nil +} + // WithMaxSenders sets the maximum number of senders in the pool. // The default maximum number of senders is 64. func WithMaxSenders(count int) LineSenderPoolOption { @@ -99,7 +135,19 @@ func (p *LineSenderPool) Acquire(ctx context.Context) (LineSender, error) { return s, nil } - return LineSenderFromConf(ctx, p.conf) + if p.conf != "" { + return LineSenderFromConf(ctx, p.conf) + } else { + conf := &lineSenderConfig{} + for _, opt := range p.opts { + opt(conf) + if conf.senderType == tcpSenderType { + return nil, errors.New("tcp/s not supported for pooled senders, use http/s only") + } + } + return newHttpLineSender(conf) + } + } // Release flushes the LineSender and returns it back to the pool. If the pool