diff --git a/dialer.go b/dialer.go index 64d4681..bf69f37 100644 --- a/dialer.go +++ b/dialer.go @@ -15,7 +15,7 @@ import ( "time" "github.com/gobwas/httphead" - "github.com/gobwas/pool/pbufio" + "github.com/gobwas/ws/internal/pbufio" ) // Constants used by Dialer. diff --git a/go.mod b/go.mod index 146c70d..bc3deb2 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,4 @@ module github.com/gobwas/ws go 1.15 -require ( - github.com/gobwas/httphead v0.1.0 - github.com/gobwas/pool v0.2.1 - golang.org/x/sys v0.6.0 // indirect -) +require github.com/gobwas/httphead v0.1.0 diff --git a/go.sum b/go.sum index 744c946..3b44d6c 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,2 @@ github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= -github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= -github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= -golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d h1:MiWWjyhUzZ+jvhZvloX6ZrUsdEghn8a64Upd8EMHglE= -golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/generic.go b/internal/generic.go new file mode 100644 index 0000000..0c1f238 --- /dev/null +++ b/internal/generic.go @@ -0,0 +1,161 @@ +package pool + +import ( + "sync" +) + +var defaultPool = New(128, 65536) + +// Get pulls object whose generic size is at least of given size. It also +// returns a real size of x for further pass to Put(). It returns -1 as real +// size for nil x. Size >-1 does not mean that x is non-nil, so checks must be +// done. +// +// Note that size could be ceiled to the next power of two. +// +// Get is a wrapper around defaultPool.Get(). +func Get(size int) (interface{}, int) { return defaultPool.Get(size) } + +// Put takes x and its size for future reuse. +// Put is a wrapper around defaultPool.Put(). +func Put(x interface{}, size int) { defaultPool.Put(x, size) } + +// Pool contains logic of reusing objects distinguishable by size in generic +// way. +type Pool struct { + pool map[int]*sync.Pool + size func(int) int +} + +// New creates new Pool that reuses objects which size is in logarithmic range +// [min, max]. +// +// Note that it is a shortcut for Custom() constructor with Options provided by +// WithLogSizeMapping() and WithLogSizeRange(min, max) calls. +func New(min, max int) *Pool { + return Custom( + WithLogSizeMapping(), + WithLogSizeRange(min, max), + ) +} + +// Custom creates new Pool with given options. +func Custom(opts ...Option) *Pool { + p := &Pool{ + pool: make(map[int]*sync.Pool), + size: identity, + } + + c := (*poolConfig)(p) + for _, opt := range opts { + opt(c) + } + + return p +} + +// Get pulls object whose generic size is at least of given size. +// It also returns a real size of x for further pass to Put() even if x is nil. +// Note that size could be ceiled to the next power of two. +func (p *Pool) Get(size int) (interface{}, int) { + n := p.size(size) + if pool := p.pool[n]; pool != nil { + return pool.Get(), n + } + return nil, size +} + +// Put takes x and its size for future reuse. +func (p *Pool) Put(x interface{}, size int) { + if pool := p.pool[size]; pool != nil { + pool.Put(x) + } +} + +type poolConfig Pool + +// AddSize adds size n to the map. +func (p *poolConfig) AddSize(n int) { + p.pool[n] = new(sync.Pool) +} + +// SetSizeMapping sets up incoming size mapping function. +func (p *poolConfig) SetSizeMapping(size func(int) int) { + p.size = size +} + +// Option configures pool. +type Option func(Config) + +// Config describes generic pool configuration. +type Config interface { + AddSize(n int) + SetSizeMapping(func(int) int) +} + +// WithSizeLogRange returns an Option that will add logarithmic range of +// pooling sizes containing [min, max] values. +func WithLogSizeRange(min, max int) Option { + return func(c Config) { + logarithmicRange(min, max, func(n int) { + c.AddSize(n) + }) + } +} + +func WithSizeMapping(sz func(int) int) Option { + return func(c Config) { + c.SetSizeMapping(sz) + } +} + +func WithLogSizeMapping() Option { + return WithSizeMapping(ceilToPowerOfTwo) +} + +const ( + bitsize = 32 << (^uint(0) >> 63) + maxint = int(1<<(bitsize-1) - 1) + maxintHeadBit = 1 << (bitsize - 2) +) + +// logarithmicRange iterates from ceiled to power of two min to max, +// calling cb on each iteration. +func logarithmicRange(min, max int, cb func(int)) { + if min == 0 { + min = 1 + } + for n := ceilToPowerOfTwo(min); n <= max; n <<= 1 { + cb(n) + } +} + +// identity is identity. +func identity(n int) int { + return n +} + +// ceilToPowerOfTwo returns the least power of two integer value greater than +// or equal to n. +func ceilToPowerOfTwo(n int) int { + if n&maxintHeadBit != 0 && n > maxintHeadBit { + panic("argument is too large") + } + if n <= 2 { + return n + } + n-- + n = fillBits(n) + n++ + return n +} + +func fillBits(n int) int { + n |= n >> 1 + n |= n >> 2 + n |= n >> 4 + n |= n >> 8 + n |= n >> 16 + n |= n >> 32 + return n +} diff --git a/internal/generic_test.go b/internal/generic_test.go new file mode 100644 index 0000000..c1c029f --- /dev/null +++ b/internal/generic_test.go @@ -0,0 +1,123 @@ +package pool + +import ( + "fmt" + "reflect" + "testing" +) + +func TestGenericPoolGet(t *testing.T) { + for _, test := range []struct { + name string + min, max int + get int + expSize int + }{ + { + min: 0, + max: 1, + get: 10, + expSize: 10, + }, + { + min: 0, + max: 16, + get: 10, + expSize: 16, + }, + } { + t.Run(test.name, func(t *testing.T) { + p := New(test.min, test.max) + _, n := p.Get(test.get) + if n != test.expSize { + t.Errorf("Get(%d) = _, %d; want %d", test.get, n, test.expSize) + } + }) + } +} + +func TestLogarithmicRange(t *testing.T) { + for _, test := range []struct { + min, max int + exp []int + }{ + {0, 8, []int{1, 2, 4, 8}}, + {0, 7, []int{1, 2, 4}}, + {0, 9, []int{1, 2, 4, 8}}, + {3, 8, []int{4, 8}}, + {1, 7, []int{1, 2, 4}}, + {1, 9, []int{1, 2, 4, 8}}, + } { + t.Run("", func(t *testing.T) { + var act []int + logarithmicRange(test.min, test.max, func(n int) { + act = append(act, n) + }) + if !reflect.DeepEqual(act, test.exp) { + t.Errorf("unexpected range from %d to %d: %v; want %v", test.min, test.max, act, test.exp) + } + }) + } +} + +func TestCeilToPowerOfTwo(t *testing.T) { + for _, test := range []struct { + in int + exp int + panic bool + }{ + {in: 0, exp: 0}, + {in: 1, exp: 1}, + {in: 2, exp: 2}, + {in: 3, exp: 4}, + {in: 4, exp: 4}, + {in: 9, exp: 16}, + + {in: maxintHeadBit - 1, exp: maxintHeadBit}, + {in: maxintHeadBit + 1, panic: true}, + } { + t.Run(fmt.Sprintf("%d to %d", test.in, test.exp), func(t *testing.T) { + defer func() { + err := recover() + if !test.panic && err != nil { + t.Fatalf("panic: %v", err) + } + if test.panic && err == nil { + t.Fatalf("want panic") + } + }() + act := ceilToPowerOfTwo(test.in) + if exp := test.exp; act != exp { + t.Errorf("CeilToPowerOfTwo(%d) = %d; want %d", test.in, act, exp) + } + }) + } +} + +func TestFillBits(t *testing.T) { + for _, test := range []struct { + in int + exp int + }{ + {0, 0}, + {1, 1}, + {btoi("0100"), btoi("0111")}, + {btoi("0101"), btoi("0111")}, + {maxintHeadBit, maxint}, + } { + t.Run(fmt.Sprintf("%v", test.in), func(t *testing.T) { + act := fillBits(test.in) + if exp := test.exp; act != exp { + t.Errorf( + "fillBits(%064b) = %064b; want %064b", + test.in, act, exp, + ) + } + }) + } +} + +func btoi(s string) (n int) { + fmt.Sscanf(s, "%b", &n) + return n +} diff --git a/internal/pbufio/pbufio.go b/internal/pbufio/pbufio.go new file mode 100644 index 0000000..02c7ea0 --- /dev/null +++ b/internal/pbufio/pbufio.go @@ -0,0 +1,96 @@ +// Package pbufio contains tools for pooling bufio.Reader and bufio.Writers. +package pbufio + +import ( + "bufio" + "io" + + pool "github.com/gobwas/ws/internal" +) + +var ( + defaultWriterPool = NewWriterPool(256, 65536) + defaultReaderPool = NewReaderPool(256, 65536) +) + +// GetWriter returns bufio.Writer whose buffer has at least size bytes. +// Note that size could be ceiled to the next power of two. +// GetWriter is a wrapper around defaultWriterPool.Get(). +func GetWriter(w io.Writer, size int) *bufio.Writer { return defaultWriterPool.Get(w, size) } + +// PutWriter takes bufio.Writer for future reuse. +// It does not reuse bufio.Writer which underlying buffer size is not power of +// PutWriter is a wrapper around defaultWriterPool.Put(). +func PutWriter(bw *bufio.Writer) { defaultWriterPool.Put(bw) } + +// GetReader returns bufio.Reader whose buffer has at least size bytes. It returns +// its capacity for further pass to Put(). +// Note that size could be ceiled to the next power of two. +// GetReader is a wrapper around defaultReaderPool.Get(). +func GetReader(w io.Reader, size int) *bufio.Reader { return defaultReaderPool.Get(w, size) } + +// PutReader takes bufio.Reader and its size for future reuse. +// It does not reuse bufio.Reader if size is not power of two or is out of pool +// min/max range. +// PutReader is a wrapper around defaultReaderPool.Put(). +func PutReader(bw *bufio.Reader) { defaultReaderPool.Put(bw) } + +// WriterPool contains logic of *bufio.Writer reuse with various size. +type WriterPool struct { + pool *pool.Pool +} + +// NewWriterPool creates new WriterPool that reuses writers which size is in +// logarithmic range [min, max]. +func NewWriterPool(min, max int) *WriterPool { + return &WriterPool{pool.New(min, max)} +} + +// Get returns bufio.Writer whose buffer has at least size bytes. +func (wp *WriterPool) Get(w io.Writer, size int) *bufio.Writer { + v, n := wp.pool.Get(size) + if v != nil { + bw := v.(*bufio.Writer) + bw.Reset(w) + return bw + } + return bufio.NewWriterSize(w, n) +} + +// Put takes ownership of bufio.Writer for further reuse. +func (wp *WriterPool) Put(bw *bufio.Writer) { + // Should reset even if we do Reset() inside Get(). + // This is done to prevent locking underlying io.Writer from GC. + bw.Reset(nil) + wp.pool.Put(bw, bw.Size()) +} + +// ReaderPool contains logic of *bufio.Reader reuse with various size. +type ReaderPool struct { + pool *pool.Pool +} + +// NewReaderPool creates new ReaderPool that reuses writers which size is in +// logarithmic range [min, max]. +func NewReaderPool(min, max int) *ReaderPool { + return &ReaderPool{pool.New(min, max)} +} + +// Get returns bufio.Reader whose buffer has at least size bytes. +func (rp *ReaderPool) Get(r io.Reader, size int) *bufio.Reader { + v, n := rp.pool.Get(size) + if v != nil { + br := v.(*bufio.Reader) + br.Reset(r) + return br + } + return bufio.NewReaderSize(r, n) +} + +// Put takes ownership of bufio.Reader for further reuse. +func (rp *ReaderPool) Put(br *bufio.Reader) { + // Should reset even if we do Reset() inside Get(). + // This is done to prevent locking underlying io.Reader from GC. + br.Reset(nil) + rp.pool.Put(br, br.Size()) +} diff --git a/internal/pbufio/pbufio_test.go b/internal/pbufio/pbufio_test.go new file mode 100644 index 0000000..c1b93de --- /dev/null +++ b/internal/pbufio/pbufio_test.go @@ -0,0 +1,63 @@ +package pbufio + +import "testing" + +func TestGetWriter(t *testing.T) { + for _, test := range []struct { + min int + max int + get int + exp int + }{ + { + min: 0, + max: 100, + get: 500, + exp: 500, + }, + { + min: 0, + max: 128, + get: 60, + exp: 64, + }, + } { + t.Run("", func(t *testing.T) { + p := NewWriterPool(test.min, test.max) + bw := p.Get(nil, test.get) + if n, exp := bw.Available(), test.exp; n != exp { + t.Errorf("unexpected Get() buffer size: %v; want %v", n, exp) + } + }) + } +} + +func TestGetReader(t *testing.T) { + for _, test := range []struct { + min int + max int + get int + exp int + }{ + { + min: 0, + max: 100, + get: 500, + exp: 500, + }, + { + min: 0, + max: 128, + get: 60, + exp: 64, + }, + } { + t.Run("", func(t *testing.T) { + p := NewReaderPool(test.min, test.max) + br := p.Get(nil, test.get) + if n, exp := br.Size(), test.exp; n != exp { + t.Errorf("unexpected Get() buffer size: %v; want %v", n, exp) + } + }) + } +} diff --git a/internal/pbytes/pool.go b/internal/pbytes/pool.go new file mode 100644 index 0000000..4a103b8 --- /dev/null +++ b/internal/pbytes/pool.go @@ -0,0 +1,61 @@ +// Package pbytes contains tools for pooling byte pool. +// Note that by default it reuse slices with capacity from 128 to 65536 bytes. +package pbytes + +import pool "github.com/gobwas/ws/internal" + +// defaultPool is used by pacakge level functions. +var defaultPool = New(128, 65536) + +// GetLen returns probably reused slice of bytes with at least capacity of n +// and exactly len of n. +// GetLen is a wrapper around defaultPool.GetLen(). +func GetLen(n int) []byte { return defaultPool.GetLen(n) } + +// Put returns given slice to reuse pool. +// Put is a wrapper around defaultPool.Put(). +func Put(p []byte) { defaultPool.Put(p) } + +// Pool contains logic of reusing byte slices of various size. +type Pool struct { + pool *pool.Pool +} + +// New creates new Pool that reuses slices which size is in logarithmic range +// [min, max]. +// +// Note that it is a shortcut for Custom() constructor with Options provided by +// pool.WithLogSizeMapping() and pool.WithLogSizeRange(min, max) calls. +func New(min, max int) *Pool { + return &Pool{pool.New(min, max)} +} + +// Get returns probably reused slice of bytes with at least capacity of c and +// exactly len of n. +func (p *Pool) Get(n, c int) []byte { + if n > c { + panic("requested length is greater than capacity") + } + + v, x := p.pool.Get(c) + if v != nil { + bts := v.([]byte) + bts = bts[:n] + return bts + } + + return make([]byte, n, x) +} + +// Put returns given slice to reuse pool. +// It does not reuse bytes whose size is not power of two or is out of pool +// min/max range. +func (p *Pool) Put(bts []byte) { + p.pool.Put(bts, cap(bts)) +} + +// GetLen returns probably reused slice of bytes with at least capacity of n +// and exactly len of n. +func (p *Pool) GetLen(n int) []byte { + return p.Get(n, n) +} diff --git a/internal/pbytes/pool_test.go b/internal/pbytes/pool_test.go new file mode 100644 index 0000000..ec3b051 --- /dev/null +++ b/internal/pbytes/pool_test.go @@ -0,0 +1,110 @@ +package pbytes + +import ( + "crypto/rand" + "reflect" + "strconv" + "testing" + "unsafe" +) + +func TestPoolGet(t *testing.T) { + for _, test := range []struct { + min int + max int + len int + cap int + exactCap int + }{ + { + min: 0, + max: 64, + len: 10, + cap: 24, + exactCap: 32, + }, + { + min: 0, + max: 0, + len: 10, + cap: 24, + exactCap: 24, + }, + } { + t.Run("", func(t *testing.T) { + p := New(test.min, test.max) + act := p.Get(test.len, test.cap) + if n := len(act); n != test.len { + t.Errorf( + "Get(%d, _) retured %d-len slice; want %[1]d", + test.len, n, + ) + } + if c := cap(act); c < test.cap { + t.Errorf( + "Get(_, %d) retured %d-cap slice; want at least %[1]d", + test.cap, c, + ) + } + if c := cap(act); test.exactCap != 0 && c != test.exactCap { + t.Errorf( + "Get(_, %d) retured %d-cap slice; want exact %d", + test.cap, c, test.exactCap, + ) + } + }) + } +} + +func TestPoolPut(t *testing.T) { + t.Skip("sometimes pool in p.GetLen might allocate new data") + p := New(0, 32) + + miss := make([]byte, 5, 5) + rand.Read(miss) + p.Put(miss) // Should not reuse. + + hit := make([]byte, 8, 8) + rand.Read(hit) + p.Put(hit) // Should reuse. + + b := p.GetLen(5) + if data(b) == data(miss) { + t.Fatalf("unexpected reuse") + } + if data(b) != data(hit) { + t.Fatalf("want reuse: %q vs %q", b, hit) + } +} + +func data(p []byte) uintptr { + hdr := (*reflect.SliceHeader)(unsafe.Pointer(&p)) + return hdr.Data +} + +func BenchmarkPool(b *testing.B) { + for _, size := range []int{ + 1 << 4, + 1 << 5, + 1 << 6, + 1 << 7, + 1 << 8, + 1 << 9, + } { + b.Run(strconv.Itoa(size)+"(pool)", func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p := GetLen(size) + Put(p) + } + }) + }) + b.Run(strconv.Itoa(size)+"(make)", func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _ = make([]byte, size) + } + }) + }) + } +} diff --git a/server.go b/server.go index f6cc8af..20b7c50 100644 --- a/server.go +++ b/server.go @@ -11,7 +11,7 @@ import ( "time" "github.com/gobwas/httphead" - "github.com/gobwas/pool/pbufio" + "github.com/gobwas/ws/internal/pbufio" ) // Constants used by ConnUpgrader. diff --git a/server_test.go b/server_test.go index 1f93721..3b45b07 100644 --- a/server_test.go +++ b/server_test.go @@ -19,7 +19,7 @@ import ( "testing" "github.com/gobwas/httphead" - "github.com/gobwas/pool/pbufio" + "github.com/gobwas/ws/internal/pbufio" ) // TODO(gobwas): upgradeGenericCase with methods like configureUpgrader, diff --git a/wsutil/cipher.go b/wsutil/cipher.go index bc25064..4795e63 100644 --- a/wsutil/cipher.go +++ b/wsutil/cipher.go @@ -3,8 +3,8 @@ package wsutil import ( "io" - "github.com/gobwas/pool/pbytes" "github.com/gobwas/ws" + "github.com/gobwas/ws/internal/pbytes" ) // CipherReader implements io.Reader that applies xor-cipher to the bytes read diff --git a/wsutil/handler.go b/wsutil/handler.go index 44fd360..dd1e97e 100644 --- a/wsutil/handler.go +++ b/wsutil/handler.go @@ -6,8 +6,8 @@ import ( "io/ioutil" "strconv" - "github.com/gobwas/pool/pbytes" "github.com/gobwas/ws" + "github.com/gobwas/ws/internal/pbytes" ) // ClosedError returned when peer has closed the connection with appropriate diff --git a/wsutil/writer.go b/wsutil/writer.go index 6a837cf..af69788 100644 --- a/wsutil/writer.go +++ b/wsutil/writer.go @@ -4,9 +4,9 @@ import ( "fmt" "io" - "github.com/gobwas/pool" - "github.com/gobwas/pool/pbytes" "github.com/gobwas/ws" + pool "github.com/gobwas/ws/internal" + "github.com/gobwas/ws/internal/pbytes" ) // DefaultWriteBuffer contains size of Writer's default buffer. It used by