Skip to content

Commit

Permalink
Merge pull request #1 from hyan23/main
Browse files Browse the repository at this point in the history
MS1
  • Loading branch information
hyan23 authored Jul 26, 2022
2 parents 5cce96b + f5e0fd8 commit 219b15a
Show file tree
Hide file tree
Showing 18 changed files with 398 additions and 300 deletions.
3 changes: 2 additions & 1 deletion format_converter/converter.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package main

import (
"dataacross/storage"
"flag"
"fmt"

"github.com/CQUST-Runner/datacross/storage"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module dataacross
module github.com/CQUST-Runner/datacross

go 1.18

Expand Down
10 changes: 7 additions & 3 deletions storage/bin_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ func (l *BinLog) WriteHeader(f File, header *FileHeader) error {
if err != nil {
return err
}
_ = n
if n != header.Size() {
return fmt.Errorf("write size unexpected")
}

n, err = f.Write(headBuffer[:])
if err != nil {
Expand Down Expand Up @@ -125,7 +127,7 @@ func (l *BinLog) AppendEntry(f File, pos int64, entry *LogEntry) (int64, error)
return 0, fmt.Errorf("log entry too large")
}
entryBuffer := make([]byte, writeSize)
_, err = entry.MarshalToSizedBuffer(entryBuffer[4 : 4+entry.Size()])
_, err = entry.MarshalToSizedBuffer(entryBuffer[4 : 4+sz])
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -160,7 +162,9 @@ func (l *BinLog) ReadEntry(f File, pos int64, entry *LogEntry) (int64, error) {

size := binary.LittleEndian.Uint32(sizeBuffer[:])
if size == 0 {
return 4, nil
entry.Reset()
// size+crc
return 4 + 4, nil
}

entryBuffer := make([]byte, size)
Expand Down
Empty file modified storage/build_proto_darwin.sh
100644 → 100755
Empty file.
18 changes: 9 additions & 9 deletions storage/file_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,34 @@ import (
)

type darwinFlie struct {
handle int
fd int
filename string
}

func (f *darwinFlie) Close() error {
if f.handle != -1 {
if err := syscall.Close(f.handle); err != nil {
if f.fd != -1 {
if err := syscall.Close(f.fd); err != nil {
return err
}
f.handle = -1
f.fd = -1
}
return nil
}

func (f *darwinFlie) Read(p []byte) (n int, err error) {
return syscall.Read(f.handle, p)
return syscall.Read(f.fd, p)
}

func (f *darwinFlie) Write(p []byte) (n int, err error) {
return syscall.Write(f.handle, p)
return syscall.Write(f.fd, p)
}

func (f *darwinFlie) Seek(offset int64, whence int) (int64, error) {
return syscall.Seek(f.handle, offset, whence)
return syscall.Seek(f.fd, offset, whence)
}

func (f *darwinFlie) Flush() error {
return syscall.Fsync(f.handle)
return syscall.Fsync(f.fd)
}

func (f *darwinFlie) Path() string {
Expand All @@ -52,5 +52,5 @@ func OpenFile(filename string, readonly bool) (File, error) {
if err != nil {
return nil, err
}
return &darwinFlie{handle: fd, filename: filename}, nil
return &darwinFlie{fd: fd, filename: filename}, nil
}
3 changes: 1 addition & 2 deletions storage/json_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ func (l *JsonLog) ReadEntry(f File, pos int64, entry *LogEntry) (int64, error) {
readSz := len(jDoc)
// won't be 0
if len(jDoc) <= 1 {
entry.Reset()
return int64(readSz), nil
return 0, fmt.Errorf("read size unexpected")
}
jDoc = jDoc[:len(jDoc)-1]
err = json.Unmarshal(jDoc, entry)
Expand Down
74 changes: 36 additions & 38 deletions storage/log_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type LogInput struct {
machineID string
w *Wal
process *LogProcess
progress *LogProgress
}

type RunLogError struct {
Expand All @@ -27,61 +27,60 @@ func (e *RunLogError) Error() string {
}

type RunLogResult struct {
status map[string]*LogProcess
status map[string]*LogProgress
err *RunLogError
}

func (r *RunLogResult) Init(s map[string]*LogProcess, e *RunLogError) {
func (r *RunLogResult) Init(s map[string]*LogProgress, e *RunLogError) {
r.status = s
r.err = e
}

func (r *RunLogResult) Position(machineID string) string {
if pos, ok := r.status[machineID]; ok {
return pos.Gid
}
return ""
}

func (r *RunLogResult) Error() error {
return r.err
}

func (r *RunLogResult) Process(machineID string) *LogProgress {
if progress, ok := r.status[machineID]; ok {
return progress
}
return newLogProgress(machineID)
}

type RunLogWorker struct {
input *LogInput
process *LogProcess
progress *LogProgress
err error
pendingOp *LogOperation
pendingOpProcess *LogProcess
pendingOpProcess *LogProgress
it *WalIterator
}

type RunLogContext struct {
workers map[string]*RunLogWorker
}

func (c *RunLogContext) Init(i ...*LogInput) error {
func (c *RunLogContext) Init(i ...*LogInput) {
c.workers = make(map[string]*RunLogWorker)

for _, input := range i {
if input == nil {
continue
}
it := input.w.IteratorOffset(input.process.Offset)
it := input.w.IteratorOffset(input.progress.Offset)
c.workers[input.machineID] = &RunLogWorker{
input: input,
process: input.process,
it: it,
input: input,
progress: input.progress,
it: it,
}
}
return nil
}

func (c *RunLogContext) Progress(machineID string) int64 {
func (c *RunLogContext) Progress(machineID string) *LogProgress {
if w, ok := c.workers[machineID]; ok {
return w.process.Num
return w.progress
}
return 0
return newLogProgress(machineID)
}

type LogRunner struct {
Expand All @@ -95,18 +94,20 @@ func (r *LogRunner) Init(machineID string, s NodeStorage) error {
return nil
}

func (r *LogRunner) runLogInner(c *RunLogContext, process *LogProcess, logOp *LogOperation) bool {
func (r *LogRunner) runLogInner(c *RunLogContext, progress *LogProgress, logOp *LogOperation) bool {
if logOp.PrevNum == 0 {
record := DBRecord{
Key: logOp.Key,
Value: logOp.Value,
MachineID: logOp.MachineId,
Offset: process.Offset,
PrevMachineID: "",
Offset: progress.Offset,
Seq: logOp.Seq,
CurrentLogGid: logOp.Gid,
PrevLogGid: "",
IsDeleted: logOp.Op == int32(Op_Del),
IsDiscarded: logOp.Op == int32(Op_Discard),
MachineChangeCount: map[string]int32{logOp.MachineId: 1},
MachineChangeCount: logOp.Changes,
Num: logOp.Num,
PrevNum: logOp.PrevNum,
}
Expand All @@ -118,7 +119,7 @@ func (r *LogRunner) runLogInner(c *RunLogContext, process *LogProcess, logOp *Lo
return true
}

if logOp.PrevNum > c.Progress(logOp.PrevMachineId) {
if logOp.PrevNum > c.Progress(logOp.PrevMachineId).Num {
return false
}

Expand All @@ -131,11 +132,11 @@ func (r *LogRunner) runLogInner(c *RunLogContext, process *LogProcess, logOp *Lo
Key: logOp.Key,
Value: logOp.Value,
MachineID: logOp.MachineId,
Offset: process.Offset,
PrevMachineID: parent.MachineID,
Offset: progress.Offset,
PrevMachineID: logOp.PrevMachineId,
Seq: logOp.Seq,
CurrentLogGid: logOp.Gid,
PrevLogGid: parent.CurrentLogGid,
PrevLogGid: logOp.PrevGid,
IsDeleted: logOp.Op == int32(Op_Del),
IsDiscarded: logOp.Op == int32(Op_Discard),
MachineChangeCount: logOp.Changes,
Expand All @@ -154,7 +155,7 @@ func (r *LogRunner) runLogInner(c *RunLogContext, process *LogProcess, logOp *Lo
Key: logOp.Key,
Value: logOp.Value,
MachineID: logOp.MachineId,
Offset: process.Offset,
Offset: progress.Offset,
PrevMachineID: logOp.PrevMachineId,
Seq: logOp.Seq,
CurrentLogGid: logOp.Gid,
Expand All @@ -180,15 +181,15 @@ func (r *LogRunner) tryAdvance(c *RunLogContext, worker *RunLogWorker) bool {
if !r.runLogInner(c, worker.pendingOpProcess, worker.pendingOp) {
return false
}
worker.process = worker.pendingOpProcess
worker.progress = worker.pendingOpProcess
worker.pendingOp = nil
worker.pendingOpProcess = nil
count++
}

for worker.it.Next() {
logOp := worker.it.LogOp()
currentProcess := LogProcess{
currentProcess := LogProgress{
Num: logOp.Num,
Offset: worker.it.Offset(),
Gid: logOp.Gid,
Expand All @@ -198,7 +199,7 @@ func (r *LogRunner) tryAdvance(c *RunLogContext, worker *RunLogWorker) bool {
worker.pendingOpProcess = &currentProcess
return count > 0
}
worker.process = &currentProcess
worker.progress = &currentProcess
count++
}
return count > 0
Expand All @@ -209,10 +210,7 @@ func (r *LogRunner) Run(i ...*LogInput) (*RunLogResult, error) {
return nil, fmt.Errorf("empty input")
}
c := RunLogContext{}
err := c.Init(i...)
if err != nil {
return nil, err
}
c.Init(i...)

blockNum := 0
for {
Expand All @@ -231,15 +229,15 @@ func (r *LogRunner) Run(i ...*LogInput) (*RunLogResult, error) {
}
}

result := RunLogResult{status: make(map[string]*LogProcess)}
result := RunLogResult{status: make(map[string]*LogProgress)}
for _, worker := range c.workers {
if worker.err != nil {
if result.err == nil {
result.err = &RunLogError{}
}
result.err.errs = append(result.err.errs, worker.err)
}
result.status[worker.input.machineID] = worker.process
result.status[worker.input.machineID] = worker.progress
}
return &result, nil
}
Loading

0 comments on commit 219b15a

Please sign in to comment.