From 731787135b77a58c206a4bd9b7ca727e156df7f0 Mon Sep 17 00:00:00 2001 From: Andrey Pechkurov Date: Mon, 12 Aug 2024 17:33:17 +0300 Subject: [PATCH] feat(client): blocking LineSenderPool --- README.md | 13 ++-- sender_pool.go | 178 +++++++++++++++++++++++++++++++++++--------- sender_pool_test.go | 86 +++++++++++++-------- 3 files changed, 206 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index 0bf043d..12037a7 100644 --- a/README.md +++ b/README.md @@ -80,15 +80,15 @@ To connect via TCP, set the configuration string to: **Warning: Experimental feature designed for use with HTTP senders ONLY** Version 3 of the client introduces a `LineSenderPool`, which provides a mechanism -to cache previously-used `LineSender`s in memory so they can be reused without -having to allocate and instantiate new senders. +to pool previously-used `LineSender`s so they can be reused without having +to allocate and instantiate new senders. -A LineSenderPool is thread-safe and can be used to concurrently Acquire and Release senders +A LineSenderPool is thread-safe and can be used to concurrently obtain senders across multiple goroutines. Since `LineSender`s must be used in a single-threaded context, a typical pattern is to Acquire a sender from a `LineSenderPool` at the beginning of a goroutine and use a deferred -execution block to Release the sender at the end of the goroutine. +execution block to Close the sender at the end of the goroutine. Here is an example of the `LineSenderPool` Acquire, Release, and Close semantics: @@ -112,7 +112,7 @@ func main() { } }() - sender, err := pool.Acquire(ctx) + sender, err := pool.Sender(ctx) if err != nil { panic(err) } @@ -122,7 +122,8 @@ func main() { Float64Column("price", 123.45). AtNow(ctx) - if err := pool.Release(ctx, sender); err != nil { + // Close call returns the sender back to the pool + if err := sender.Close(ctx); err != nil { panic(err) } } diff --git a/sender_pool.go b/sender_pool.go index 2c80a75..eee94ec 100644 --- a/sender_pool.go +++ b/sender_pool.go @@ -28,8 +28,11 @@ import ( "context" "errors" "fmt" + "math/big" "strings" "sync" + "sync/atomic" + "time" ) // LineSenderPool wraps a mutex-protected slice of [LineSender]. It allows a goroutine to @@ -38,12 +41,21 @@ import ( // WARNING: This is an experimental API that is designed to work with HTTP senders ONLY. type LineSenderPool struct { maxSenders int + numSenders int // number of used and free senders conf string closed bool - senders []LineSender - mu *sync.Mutex + freeSenders []*pooledSender + mu *sync.Mutex + cond sync.Cond // used to wake up free sender waiters +} + +type pooledSender struct { + pool *LineSenderPool + wrapped LineSender + dirty bool // set to true if any of the sender calls returned an error + tick uint64 // even values stand for free sender, odd values mean in-use sender } // LineSenderPoolOption defines line sender pool config option. @@ -61,11 +73,12 @@ func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, e } pool := &LineSenderPool{ - maxSenders: 64, - conf: conf, - senders: []LineSender{}, - mu: &sync.Mutex{}, + maxSenders: 64, + conf: conf, + freeSenders: make([]*pooledSender, 0, 64), + mu: &sync.Mutex{}, } + pool.cond = *sync.NewCond(pool.mu) for _, opt := range opts { opt(pool) @@ -82,50 +95,71 @@ func WithMaxSenders(count int) LineSenderPoolOption { } } -// Acquire obtains a LineSender from the pool. If the pool is empty, a new +// Sender obtains a LineSender from the pool. If the pool is empty, a new // LineSender will be instantiated using the pool's config string. -func (p *LineSenderPool) Acquire(ctx context.Context) (LineSender, error) { +// If there is already maximum number of senders obtained from the pool, +// this calls will block until one of the senders is returned back to +// the pool by calling sender.Close(). +func (p *LineSenderPool) Sender(ctx context.Context) (LineSender, error) { p.mu.Lock() defer p.mu.Unlock() if p.closed { - return nil, fmt.Errorf("cannot Acquire a LineSender from a closed LineSenderPool") + return nil, errors.New("cannot Acquire a LineSender from a closed LineSenderPool") + } + + // We may have to wait for a free sender + for len(p.freeSenders) == 0 && p.numSenders == p.maxSenders { + p.cond.Wait() } - if len(p.senders) > 0 { + if len(p.freeSenders) > 0 { // Pop sender off the slice and return it - s := p.senders[len(p.senders)-1] - p.senders = p.senders[0 : len(p.senders)-1] + s := p.freeSenders[len(p.freeSenders)-1] + atomic.AddUint64(&s.tick, 1) + p.freeSenders = p.freeSenders[0 : len(p.freeSenders)-1] return s, nil } - return LineSenderFromConf(ctx, p.conf) + s, err := LineSenderFromConf(ctx, p.conf) + if err != nil { + return nil, err + } + p.numSenders++ + ps := &pooledSender{ + pool: p, + wrapped: s, + } + atomic.AddUint64(&ps.tick, 1) + return ps, nil } -// Release flushes the LineSender and returns it back to the pool. If the pool -// is full, the sender is closed and discarded. In cases where the sender's -// flush fails, it is not added back to the pool. -func (p *LineSenderPool) Release(ctx context.Context, s LineSender) error { - // If there is an error on flush, do not add the sender back to the pool - if err := s.Flush(ctx); err != nil { - return err +func (p *LineSenderPool) free(ctx context.Context, ps *pooledSender) error { + var flushErr error + + if ps.dirty { + flushErr = ps.Flush(ctx) } p.mu.Lock() defer p.mu.Unlock() - for i := range p.senders { - if p.senders[i] == s { - return fmt.Errorf("LineSender %p has already been released back to the pool", s) - } + if flushErr != nil { + // Failed to flush, close and call it a day + p.numSenders-- + closeErr := ps.wrapped.Close(ctx) + return fmt.Errorf("%s %w", flushErr, closeErr) } - if p.closed || len(p.senders) >= p.maxSenders { - return s.Close(ctx) + if ps.dirty || p.closed { + // Previous error or closed pool, close and call it a day + p.numSenders-- + return ps.wrapped.Close(ctx) } - p.senders = append(p.senders, s) - + p.freeSenders = append(p.freeSenders, ps) + // Notify free sender waiters, if any + p.cond.Broadcast() return nil } @@ -135,23 +169,28 @@ func (p *LineSenderPool) Close(ctx context.Context) error { p.mu.Lock() defer p.mu.Unlock() + if p.closed { + // Already closed + return nil + } p.closed = true var senderErrors []error - for _, s := range p.senders { - senderErr := s.Close(ctx) + for _, ps := range p.freeSenders { + senderErr := ps.wrapped.Close(ctx) if senderErr != nil { senderErrors = append(senderErrors, senderErr) - } } + p.numSenders -= len(p.freeSenders) + p.freeSenders = []*pooledSender{} if len(senderErrors) == 0 { return nil } - err := fmt.Errorf("error closing one or more LineSenders in the pool") + err := errors.New("error closing one or more LineSenders in the pool") for _, senderErr := range senderErrors { err = fmt.Errorf("%s %w", err, senderErr) } @@ -170,10 +209,81 @@ func (p *LineSenderPool) IsClosed() bool { return p.closed } -// Len returns the numbers of cached LineSenders in the pool. +// Len returns the number of LineSenders in the pool. func (p *LineSenderPool) Len() int { p.mu.Lock() defer p.mu.Unlock() - return len(p.senders) + return len(p.freeSenders) +} + +func (ps *pooledSender) Table(name string) LineSender { + ps.wrapped.Table(name) + return ps +} + +func (ps *pooledSender) Symbol(name, val string) LineSender { + ps.wrapped.Symbol(name, val) + return ps +} + +func (ps *pooledSender) Int64Column(name string, val int64) LineSender { + ps.wrapped.Int64Column(name, val) + return ps +} + +func (ps *pooledSender) Long256Column(name string, val *big.Int) LineSender { + ps.wrapped.Long256Column(name, val) + return ps +} + +func (ps *pooledSender) TimestampColumn(name string, ts time.Time) LineSender { + ps.wrapped.TimestampColumn(name, ts) + return ps +} + +func (ps *pooledSender) Float64Column(name string, val float64) LineSender { + ps.wrapped.Float64Column(name, val) + return ps +} + +func (ps *pooledSender) StringColumn(name, val string) LineSender { + ps.wrapped.StringColumn(name, val) + return ps +} + +func (ps *pooledSender) BoolColumn(name string, val bool) LineSender { + ps.wrapped.BoolColumn(name, val) + return ps +} + +func (ps *pooledSender) AtNow(ctx context.Context) error { + err := ps.wrapped.AtNow(ctx) + if err != nil { + ps.dirty = true + } + return err +} + +func (ps *pooledSender) At(ctx context.Context, ts time.Time) error { + err := ps.wrapped.At(ctx, ts) + if err != nil { + ps.dirty = true + } + return err +} + +func (ps *pooledSender) Flush(ctx context.Context) error { + err := ps.wrapped.Flush(ctx) + if err != nil { + ps.dirty = true + } + return err +} + +func (ps *pooledSender) Close(ctx context.Context) error { + if atomic.AddUint64(&ps.tick, 1)&1 == 1 { + return errors.New("double sender close") + } + return ps.pool.free(ctx, ps) } diff --git a/sender_pool_test.go b/sender_pool_test.go index 4f37425..5f8d1e6 100644 --- a/sender_pool_test.go +++ b/sender_pool_test.go @@ -27,6 +27,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -41,35 +42,35 @@ func TestBasicBehavior(t *testing.T) { ctx := context.Background() // Start with an empty pool, allocate a new sender - s1, err := p.Acquire(ctx) + s1, err := p.Sender(ctx) assert.NoError(t, err) // Release the sender and add it to the pool - assert.NoError(t, p.Release(ctx, s1)) + assert.NoError(t, s1.Close(ctx)) // Acquiring a sender will return the initial one from the pool - s2, err := p.Acquire(ctx) + s2, err := p.Sender(ctx) assert.NoError(t, err) assert.Same(t, s1, s2) // Acquiring another sender will create a new one - s3, err := p.Acquire(ctx) + s3, err := p.Sender(ctx) assert.NoError(t, err) assert.NotSame(t, s1, s3) // Releasing the new sender will add it back to the pool - assert.NoError(t, p.Release(ctx, s3)) + assert.NoError(t, s3.Close(ctx)) // Releasing the original sender will add it to the end of the pool slice - assert.NoError(t, p.Release(ctx, s2)) + assert.NoError(t, s2.Close(ctx)) // Acquiring a new sender will pop the original one off the slice - s4, err := p.Acquire(ctx) + s4, err := p.Sender(ctx) assert.NoError(t, err) assert.Same(t, s1, s4) // Acquiring another sender will pop the second one off the slice - s5, err := p.Acquire(ctx) + s5, err := p.Sender(ctx) assert.NoError(t, err) assert.Same(t, s3, s5) } @@ -81,57 +82,80 @@ func TestDoubleReleaseShouldFail(t *testing.T) { ctx := context.Background() // Start with an empty pool, allocate a new sender - s1, err := p.Acquire(ctx) + s1, err := p.Sender(ctx) assert.NoError(t, err) // Release the sender - assert.NoError(t, p.Release(ctx, s1)) + assert.NoError(t, s1.Close(ctx)) - // Try to release the sender again. This should fail because it already exists in the slice - assert.Error(t, p.Release(ctx, s1)) + // Try to release the sender again. This should fail + assert.Error(t, s1.Close(ctx)) } func TestMaxPoolSize(t *testing.T) { // Create a pool with 2 max senders - p, err := questdb.PoolFromConf("http::addr=localhost:1234", questdb.WithMaxSenders(2)) + p, err := questdb.PoolFromConf("http::addr=localhost:1234", questdb.WithMaxSenders(3)) require.NoError(t, err) ctx := context.Background() // Allocate 3 senders - s1, err := p.Acquire(ctx) + s1, err := p.Sender(ctx) assert.NoError(t, err) - s2, err := p.Acquire(ctx) + s2, err := p.Sender(ctx) assert.NoError(t, err) - s3, err := p.Acquire(ctx) + s3, err := p.Sender(ctx) assert.NoError(t, err) // Release all senders in reverse order // Internal slice will look like: [ s3 , s2 ] - assert.NoError(t, p.Release(ctx, s3)) - assert.NoError(t, p.Release(ctx, s2)) - assert.NoError(t, p.Release(ctx, s1)) + assert.NoError(t, s3.Close(ctx)) + assert.NoError(t, s2.Close(ctx)) + assert.NoError(t, s1.Close(ctx)) // Acquire 3 more senders. - // The first one will be s2 (senders get popped off the slice) - s, err := p.Acquire(ctx) + // The first one will be s3 (senders get popped off the slice) + s, err := p.Sender(ctx) + assert.NoError(t, err) + assert.Same(t, s, s1) + + // The next will be s2 + s, err = p.Sender(ctx) assert.NoError(t, err) assert.Same(t, s, s2) - // The next will be s3 - s, err = p.Acquire(ctx) + // The final one will s1 + s, err = p.Sender(ctx) assert.NoError(t, err) assert.Same(t, s, s3) - // The final one will not be s1, s2, or s3 because the slice is empty - s, err = p.Acquire(ctx) - assert.NoError(t, err) - assert.NotSame(t, s, s1) - assert.NotSame(t, s, s2) - assert.NotSame(t, s, s3) + // Now verify the Sender caller gets blocked until a sender is freed + successFlag := int64(0) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + s, err := p.Sender(ctx) + assert.NoError(t, err) + atomic.AddInt64(&successFlag, 1) + assert.Same(t, s, s3) + assert.NoError(t, s.Close(ctx)) + wg.Done() + }() + + assert.Equal(t, atomic.LoadInt64(&successFlag), int64(0)) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, atomic.LoadInt64(&successFlag), int64(0)) + + assert.NoError(t, s3.Close(ctx)) + wg.Wait() + assert.Equal(t, atomic.LoadInt64(&successFlag), int64(1)) + + assert.NoError(t, s2.Close(ctx)) + assert.NoError(t, s1.Close(ctx)) + assert.NoError(t, p.Close(ctx)) } func TestMultiThreadedPoolWritesOverHttp(t *testing.T) { @@ -154,12 +178,12 @@ func TestMultiThreadedPoolWritesOverHttp(t *testing.T) { i := i wg.Add(1) go func() { - sender, err := pool.Acquire(ctx) + sender, err := pool.Sender(ctx) assert.NoError(t, err) sender.Table("test").Int64Column("thread", int64(i)).AtNow(ctx) - assert.NoError(t, pool.Release(ctx, sender)) + assert.NoError(t, sender.Close(ctx)) wg.Done() }()