Skip to content

Commit

Permalink
fix bug of 'rump' mode only syncing db 0 data
Browse files Browse the repository at this point in the history
  • Loading branch information
vinllen committed May 16, 2019
2 parents 1ad1738 + dc91566 commit 3222891
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 39 deletions.
3 changes: 3 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
2019-05-16 Alibaba Cloud.
* VERSION: 1.6.2
* BUGFIX: fix bug of `rump` mode only syncing db 0 data.
2019-05-14 Alibaba Cloud.
* VERSION: 1.6.1
* IMPROVE: support fetching db address from sentinel, the failover
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ Version rules: a.b.c.
| improve-\* | improvement branch. forked from develop branch and then merge back after finish developing, testing, and code review. |

Tag rules:<br>
Add tag when releasing: "release-v{version}-{date}". for example: "release-v1.0.2-20180628"
Add tag when releasing: "release-v{version}-{date}". for example: "release-v1.0.2-20180628"<br>
User can use `-version` to print the version.

# Usage
---
Expand Down
37 changes: 37 additions & 0 deletions src/redis-shake/common/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package utils

import (
"bytes"
"fmt"
"strconv"
)

func ParseKeyspace(content []byte) (map[int32]int64, error) {
if bytes.HasPrefix(content, []byte("# Keyspace")) == false {
return nil, fmt.Errorf("invalid info Keyspace: %s", string(content))
}

lines := bytes.Split(content, []byte("\n"))
reply := make(map[int32]int64)
for _, line := range lines {
line = bytes.TrimSpace(line)
if bytes.HasPrefix(line, []byte("db")) == true {
// line "db0:keys=18,expires=0,avg_ttl=0"
items := bytes.Split(line, []byte(":"))
db, err := strconv.Atoi(string(items[0][2:]))
if err != nil {
return nil, err
}
nums := bytes.Split(items[1], []byte(","))
if bytes.HasPrefix(nums[0], []byte("keys=")) == false {
return nil, fmt.Errorf("invalid info Keyspace: %s", string(content))
}
keysNum, err := strconv.ParseInt(string(nums[0][5:]), 10, 0)
if err != nil {
return nil, err
}
reply[int32(db)] = int64(keysNum)
} // end true
} // end for
return reply, nil
}
127 changes: 90 additions & 37 deletions src/redis-shake/rump.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"pkg/libs/log"
"strconv"
"sync"
"fmt"

"redis-shake/common"
"redis-shake/configure"
Expand Down Expand Up @@ -87,47 +88,22 @@ func (cr *CmdRump) fetcher(idx int) {
log.Panicf("fetch db node failed: length[%v], error[%v]", length, err)
}

log.Infof("start fetcher with special-cloud[%v], length[%v]", conf.Options.ScanSpecialCloud, length)
log.Infof("start fetcher with special-cloud[%v], nodes[%v]", conf.Options.ScanSpecialCloud, length)

// iterate all source nodes
for i := 0; i < length; i++ {
// fetch data from on node
for {
keys, err := cr.scanners[idx].ScanKey(i)
if err != nil {
log.Panic(err)
}

log.Info("scaned keys: ", len(keys))

if len(keys) != 0 {
// pipeline dump
for _, key := range keys {
log.Debug("scan key: ", key)
cr.sourceConn[idx].Send("DUMP", key)
}
dumps, err := redis.Strings(cr.sourceConn[idx].Do(""))
if err != nil && err != redis.ErrNil {
log.Panicf("do dump with failed[%v]", err)
}

// pipeline ttl
for _, key := range keys {
cr.sourceConn[idx].Send("PTTL", key)
}
pttls, err := redis.Int64s(cr.sourceConn[idx].Do(""))
if err != nil && err != redis.ErrNil {
log.Panicf("do ttl with failed[%v]", err)
}

for i, k := range keys {
cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
}
}
// fetch db number from 'info Keyspace'
dbNumber, err := cr.getSourceDbList(i)
if err != nil {
log.Panic(err)
}

// Last iteration of scan.
if cr.scanners[idx].EndNode() {
break
log.Infof("fetch node[%v] with db list: %v", i, dbNumber)
// iterate all db
for _, db := range dbNumber {
log.Infof("fetch node[%v] db[%v]", i, db)
if err := cr.doFetch(int(db), i); err != nil {
log.Panic(err)
}
}
}
Expand Down Expand Up @@ -174,3 +150,80 @@ func (cr *CmdRump) receiver() {
}
}
}

func (cr *CmdRump) getSourceDbList(id int) ([]int32, error) {
conn := cr.sourceConn[id]
if ret, err := conn.Do("info", "Keyspace"); err != nil {
return nil, err
} else if mp, err := utils.ParseKeyspace(ret.([]byte)); err != nil {
return nil, err
} else {
list := make([]int32, 0, len(mp))
for key, val := range mp {
if val > 0 {
list = append(list, key)
}
}
return list, nil
}
}

func (cr *CmdRump) doFetch(db, idx int) error {
// send 'select' command to both source and target
log.Infof("send source select db")
if _, err := cr.sourceConn[idx].Do("select", db); err != nil {
return err
}

log.Infof("send target select db")
cr.targetConn.Flush()
if err := cr.targetConn.Send("select", db); err != nil {
return err
}
cr.targetConn.Flush()

log.Infof("finish select db, start fetching node[%v] db[%v]", idx, db)

for {
keys, err := cr.scanners[idx].ScanKey(idx)
if err != nil {
return err
}

log.Info("scanned keys: ", len(keys))

if len(keys) != 0 {
// pipeline dump
for _, key := range keys {
log.Debug("scan key: ", key)
cr.sourceConn[idx].Send("DUMP", key)
}
dumps, err := redis.Strings(cr.sourceConn[idx].Do(""))
if err != nil && err != redis.ErrNil {
return fmt.Errorf("do dump with failed[%v]", err)
}

// pipeline ttl
for _, key := range keys {
cr.sourceConn[idx].Send("PTTL", key)
}
pttls, err := redis.Int64s(cr.sourceConn[idx].Do(""))
if err != nil && err != redis.ErrNil {
return fmt.Errorf("do ttl with failed[%v]", err)
}

for i, k := range keys {
cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
}
}

// Last iteration of scan.
if cr.scanners[idx].EndNode() {
break
}
}

log.Infof("finish fetching node[%v] db[%v]", idx, db)

return nil
}
2 changes: 1 addition & 1 deletion src/redis-shake/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (cmd *CmdSync) Main() {
}

var wg sync.WaitGroup
wg.Add(len(conf.Options.SourceAddress))
wg.Add(len(conf.Options.SourceAddressList))

for i := 0; i < int(conf.Options.SourceParallel); i++ {
go func() {
Expand Down

0 comments on commit 3222891

Please sign in to comment.