Skip to content

Commit

Permalink
Don't restart scanner renewer on every Next call
Browse files Browse the repository at this point in the history
Most calls to Next are just reading some buffered results. The scanner
renewer only needs to be stopped and started again when we ask for
more results from HBase. Also, we shouldn't start a renewer if we know
we have already fetched the final results.

Pass in client's logger to scanner.

Use renewLoop's ctx in renew().

Doubled scannerLease sleeps in integration tests to reduce flakiness
of tests.
  • Loading branch information
aaronbee committed Dec 31, 2024
1 parent 7abcffa commit 7492f7e
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 51 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (c *client) Close() {
}

func (c *client) Scan(s *hrpc.Scan) hrpc.Scanner {
return newScanner(c, s)
return newScanner(c, s, c.logger)
}

func (c *client) Get(g *hrpc.Get) (*hrpc.Result, error) {
Expand Down
11 changes: 6 additions & 5 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2681,16 +2681,17 @@ func TestScannerTimeout(t *testing.T) {
}

// force lease timeout
time.Sleep(scannerLease)
time.Sleep(scannerLease * 2)

_, err = scanner.Next()

// lease timeout should return an UnknownScannerException
if err != nil && strings.Contains(err.Error(),
"org.apache.hadoop.hbase.UnknownScannerException") {
fmt.Println("Error matches: UnknownScannerException")
t.Log("Error matches: UnknownScannerException")
} else {
t.Fatalf("Error does not match org.apache.hadoop.hbase.UnknownScannerException")
t.Fatalf("Error does not match org.apache.hadoop.hbase.UnknownScannerException, "+
"got: %v", err)
}
}

Expand Down Expand Up @@ -2730,8 +2731,6 @@ func TestScannerRenewal(t *testing.T) {
defer scanner.Close()
for i := 0; i < numRows; i++ {
rsp, err := scanner.Next()
// Sleep to trigger renewal
time.Sleep(scannerLease)
if err != nil {
t.Fatalf("Scanner.Next() returned error: %v", err)
}
Expand All @@ -2742,6 +2741,8 @@ func TestScannerRenewal(t *testing.T) {
if !bytes.Equal(rsp.Cells[0].Value, expectedValue) {
t.Fatalf("Unexpected value. Got %v, want %v", rsp.Cells[0].Value, expectedValue)
}
// Sleep to trigger renewal
time.Sleep(scannerLease * 2)
}
// Ensure scanner is exhausted
rsp, err := scanner.Next()
Expand Down
68 changes: 32 additions & 36 deletions scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,35 @@ func (s *scanner) fetch() ([]*pb.Result, error) {
}

func (s *scanner) peek() (*pb.Result, error) {
if len(s.results) == 0 {
var (
err error
rs []*pb.Result
)
if s.closed {
// done scanning
return nil, io.EOF
}
if len(s.results) > 0 {
return s.results[0], nil
}

rs, err = s.fetch()
if err != nil {
return nil, err
}
if s.renewCancel != nil {
// About to send new Scan request to HBase, cancel our
// renewer.
s.renewCancel()
s.renewCancel = nil
}

// fetch cannot return zero results
s.results = rs
if s.closed {
// done scanning
return nil, io.EOF
}

rs, err := s.fetch()
if err != nil {
return nil, err
}
if !s.closed && s.rpc.RenewInterval() > 0 {
// Start up a renewer
renewCtx, cancel := context.WithCancel(s.rpc.Context())
s.renewCancel = cancel
go s.renewLoop(renewCtx, s.startRow)
}

// fetch cannot return zero results
s.results = rs
return s.results[0], nil
}

Expand Down Expand Up @@ -130,7 +141,7 @@ func (s *scanner) coalesce(result, partial *pb.Result) (*pb.Result, bool) {
return result, true
}

func newScanner(c RPCClient, rpc *hrpc.Scan) *scanner {
func newScanner(c RPCClient, rpc *hrpc.Scan, logger *slog.Logger) *scanner {
var sm map[string]int64
if rpc.TrackScanMetrics() {
sm = make(map[string]int64)
Expand All @@ -141,7 +152,7 @@ func newScanner(c RPCClient, rpc *hrpc.Scan) *scanner {
startRow: rpc.StartRow(),
curRegionScannerID: noScannerID,
scanMetrics: sm,
logger: slog.Default(),
logger: logger,
}
}

Expand All @@ -154,21 +165,6 @@ func toLocalResult(r *pb.Result) *hrpc.Result {
}

func (s *scanner) Next() (*hrpc.Result, error) {
if s.rpc.RenewInterval() > 0 && s.renewCancel != nil {
s.renewCancel()
}

res, err := s.nextInternal()

if err == nil && s.rpc.RenewInterval() > 0 {
renewCtx, cancel := context.WithCancel(s.rpc.Context())
s.renewCancel = cancel
go s.renewLoop(renewCtx, s.startRow)
}
return res, err
}

func (s *scanner) nextInternal() (*hrpc.Result, error) {
var (
result, partial *pb.Result
err error
Expand Down Expand Up @@ -385,11 +381,11 @@ func (s *scanner) closeRegionScanner() {
}

// renews a scanner by resending scan request with renew = true
func (s *scanner) renew(startRow []byte) error {
if err := s.rpc.Context().Err(); err != nil {
func (s *scanner) renew(ctx context.Context, startRow []byte) error {
if err := ctx.Err(); err != nil {
return err
}
rpc, err := hrpc.NewScanRange(s.rpc.Context(),
rpc, err := hrpc.NewScanRange(ctx,
s.rpc.Table(),
startRow,
nil,
Expand All @@ -411,7 +407,7 @@ func (s *scanner) renewLoop(ctx context.Context, startRow []byte) {
for {
select {
case <-t.C:
if err := s.renew(startRow); err != nil {
if err := s.renew(ctx, startRow); err != nil {
s.logger.Error("error renewing scanner", "err", err)
return
}
Expand Down
19 changes: 10 additions & 9 deletions scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"reflect"
"sync"
"testing"
Expand Down Expand Up @@ -120,7 +121,7 @@ func TestScanner(t *testing.T) {
}

var scannerID uint64 = 42
scanner := newScanner(c, scan)
scanner := newScanner(c, scan, slog.Default())

s, err := hrpc.NewScanRange(scan.Context(), table, nil, nil,
hrpc.NumberOfRows(2))
Expand Down Expand Up @@ -412,7 +413,7 @@ func TestScanMetrics(t *testing.T) {
t.Fatal(err)
}

sc := newScanner(c, scan)
sc := newScanner(c, scan, slog.Default())

c.EXPECT().SendRPC(&scanMatcher{scan: scan}).Return(&pb.ScanResponse{
Results: tcase.results,
Expand Down Expand Up @@ -488,7 +489,7 @@ func TestErrorFirstFetchNoMetrics(t *testing.T) {
if err != nil {
t.Fatal(err)
}
scanner := newScanner(c, scan)
scanner := newScanner(c, scan, slog.Default())

srange, err := hrpc.NewScanRange(context.Background(), table, nil, nil)
if err != nil {
Expand Down Expand Up @@ -528,7 +529,7 @@ func TestErrorFirstFetchWithMetrics(t *testing.T) {
if err != nil {
t.Fatal(err)
}
scanner := newScanner(c, scan)
scanner := newScanner(c, scan, slog.Default())

srange, err := hrpc.NewScanRange(context.Background(), table, nil, nil,
hrpc.TrackScanMetrics())
Expand Down Expand Up @@ -570,7 +571,7 @@ func testErrorScanFromID(t *testing.T, scan *hrpc.Scan, out []*hrpc.Result) {
defer wg.Wait()

var scannerID uint64 = 42
scanner := newScanner(c, scan)
scanner := newScanner(c, scan, slog.Default())

srange, err := hrpc.NewScanRange(scan.Context(), table, nil, nil, scan.Options()...)
if err != nil {
Expand Down Expand Up @@ -679,7 +680,7 @@ func testPartialResults(t *testing.T, scan *hrpc.Scan, expected []*hrpc.Result)
}

var scannerID uint64
scanner := newScanner(c, scan)
scanner := newScanner(c, scan, slog.Default())
ctx := scan.Context()
for _, partial := range tcase {
partial := partial
Expand Down Expand Up @@ -736,7 +737,7 @@ func TestReversedScanner(t *testing.T) {

var scannerID uint64 = 42

scanner := newScanner(c, scan)
scanner := newScanner(c, scan, slog.Default())
ctx = scan.Context()
s, err := hrpc.NewScanRange(ctx, table, nil, nil, hrpc.Reversed())
if err != nil {
Expand Down Expand Up @@ -819,7 +820,7 @@ func TestScannerWithContextCanceled(t *testing.T) {
t.Fatal(err)
}

scanner := newScanner(c, scan)
scanner := newScanner(c, scan, slog.Default())

cancel()

Expand All @@ -839,7 +840,7 @@ func TestScannerClosed(t *testing.T) {
t.Fatal(err)
}

scanner := newScanner(c, scan)
scanner := newScanner(c, scan, slog.Default())
scanner.Close()

_, err = scanner.Next()
Expand Down

0 comments on commit 7492f7e

Please sign in to comment.