Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Oct 13, 2024
1 parent 63a973b commit e3dcc91
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 83 deletions.
24 changes: 15 additions & 9 deletions mongo/client_bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,16 @@ func (mb *modelBatches) IsOrdered() *bool {

func (mb *modelBatches) AdvanceBatches(n int) {
mb.offset += n
if mb.offset > len(mb.models) {
mb.offset = len(mb.models)
}
}

func (mb *modelBatches) End() bool {
return len(mb.models) <= mb.offset
func (mb *modelBatches) Size() int {
if mb.offset > len(mb.models) {
return 0
}
return len(mb.models) - mb.offset
}

func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
Expand All @@ -181,7 +187,7 @@ func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, to
dst = append(dst, doc...)
return dst
},
appendEnd: func(dst []byte, idx, length int32) []byte {
updateLength: func(dst []byte, idx, length int32) []byte {
dst = bsoncore.UpdateLength(dst, idx, length)
return dst
},
Expand All @@ -193,7 +199,7 @@ func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, maxDocSize, total
fn := functionSet{
appendStart: bsoncore.AppendArrayElementStart,
appendDocument: bsoncore.AppendDocumentElement,
appendEnd: func(dst []byte, idx, _ int32) []byte {
updateLength: func(dst []byte, idx, _ int32) []byte {
dst, _ = bsoncore.AppendArrayEnd(dst, idx)
return dst
},
Expand All @@ -204,11 +210,11 @@ func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, maxDocSize, total
type functionSet struct {
appendStart func([]byte, string) (int32, []byte)
appendDocument func([]byte, string, []byte) []byte
appendEnd func([]byte, int32, int32) []byte
updateLength func([]byte, int32, int32) []byte
}

func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
if mb.End() {
if mb.Size() == 0 {
return 0, dst, io.EOF
}

Expand Down Expand Up @@ -344,8 +350,8 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
return 0, dst[:l], nil
}

dst = fn.appendEnd(dst, opsIdx, int32(len(dst[opsIdx:])))
nsDst = fn.appendEnd(nsDst, nsIdx, int32(len(nsDst[nsIdx:])))
dst = fn.updateLength(dst, opsIdx, int32(len(dst[opsIdx:])))
nsDst = fn.updateLength(nsDst, nsIdx, int32(len(nsDst[nsIdx:])))
dst = append(dst, nsDst...)

mb.retryMode = driver.RetryNone
Expand Down Expand Up @@ -483,7 +489,7 @@ func (mb *modelBatches) appendUpdateResult(cur *cursorInfo, raw bson.Raw) bool {
result.ModifiedCount = int64(*cur.NModified)
}
if cur.Upserted != nil {
result.UpsertedID = (*cur.Upserted).ID
result.UpsertedID = cur.Upserted.ID
}
mb.result.UpdateResults[int(cur.Idx)] = result
if err := cur.extractError(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion mongo/integration/client_side_encryption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func TestClientSideEncryptionCustomCrypt(t *testing.T) {
"expected 0 calls to DecryptExplicit, got %v", cc.numDecryptExplicitCalls)
assert.Equal(mt, cc.numCloseCalls, 0,
"expected 0 calls to Close, got %v", cc.numCloseCalls)
assert.Equal(mt, cc.numBypassAutoEncryptionCalls, 2,
assert.Equal(mt, cc.numBypassAutoEncryptionCalls, 1,
"expected 2 calls to BypassAutoEncryption, got %v", cc.numBypassAutoEncryptionCalls)
})
}
Expand Down
8 changes: 4 additions & 4 deletions mongo/integration/crud_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func TestClientBulkWrite(t *testing.T) {
mt.Run("input with greater than maxWriteBatchSize", func(mt *mtest.T) {
var opsCnt []int
monitor := &event.CommandMonitor{
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
Started: func(_ context.Context, e *event.CommandStartedEvent) {
if e.CommandName == "bulkWrite" {
v := e.Command.Lookup("ops")
elems, err := v.Array().Elements()
Expand Down Expand Up @@ -453,7 +453,7 @@ func TestClientBulkWrite(t *testing.T) {
mt.Run("input with greater than maxMessageSizeBytes", func(mt *mtest.T) {
var opsCnt []int
monitor := &event.CommandMonitor{
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
Started: func(_ context.Context, e *event.CommandStartedEvent) {
if e.CommandName == "bulkWrite" {
v := e.Command.Lookup("ops")
elems, err := v.Array().Elements()
Expand Down Expand Up @@ -533,7 +533,7 @@ func TestClientBulkWrite(t *testing.T) {

var eventCnt int
monitor := &event.CommandMonitor{
Started: func(ctx context.Context, e *event.CommandStartedEvent) {
Started: func(_ context.Context, e *event.CommandStartedEvent) {
if e.CommandName == "bulkWrite" {
eventCnt++
}
Expand Down Expand Up @@ -659,7 +659,7 @@ func TestClientBulkWrite(t *testing.T) {
assert.True(mt, getMoreCalled, "the getMore was not called")
})

mt.Run("bulkWrite handles a getMore error", func(mt *mtest.T) {
mt.Run("bulkWrite handles a getMore error", func(_ *mtest.T) {
})

mt.Run("bulkWrite returns error for unacknowledged too-large insert", func(mt *mtest.T) {
Expand Down
6 changes: 3 additions & 3 deletions mongo/integration/unified/client_operation_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func executeClientBulkWrite(ctx context.Context, operation *operation) (*operati

resBuilder = bsoncore.NewDocumentBuilder()
for k, v := range res.DeleteResults {
resBuilder.AppendDocument(strconv.Itoa(int(k)),
resBuilder.AppendDocument(strconv.Itoa(k),
bsoncore.NewDocumentBuilder().
AppendInt64("deletedCount", v.DeletedCount).
Build(),
Expand All @@ -254,7 +254,7 @@ func executeClientBulkWrite(ctx context.Context, operation *operation) (*operati
if err != nil {
return nil, err
}
resBuilder.AppendDocument(strconv.Itoa(int(k)),
resBuilder.AppendDocument(strconv.Itoa(k),
bsoncore.NewDocumentBuilder().
AppendValue("insertedId", bsoncore.Value{Type: t, Data: d}).
Build(),
Expand All @@ -274,7 +274,7 @@ func executeClientBulkWrite(ctx context.Context, operation *operation) (*operati
}
b.AppendValue("upsertedId", bsoncore.Value{Type: t, Data: d})
}
resBuilder.AppendDocument(strconv.Itoa(int(k)), b.Build())
resBuilder.AppendDocument(strconv.Itoa(k), b.Build())
}
rawBuilder.AppendDocument("updateResults", resBuilder.Build())

Expand Down
3 changes: 1 addition & 2 deletions x/mongo/driver/batch_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,7 @@ func (bc *BatchCursor) getMore(ctx context.Context) {
},
Database: bc.database,
Deployment: bc.getOperationDeployment(),
ProcessResponseFn: func(_ context.Context, response bsoncore.Document, info ResponseInfo) error {
// response := info.ServerResponse
ProcessResponseFn: func(_ context.Context, response bsoncore.Document, _ ResponseInfo) error {
id, ok := response.Lookup("cursor", "id").Int64OK()
if !ok {
return fmt.Errorf("cursor.id should be an int64 but is a BSON %s", response.Lookup("cursor", "id").Type)
Expand Down
31 changes: 23 additions & 8 deletions x/mongo/driver/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ type Batches struct {
offset int
}

// AppendBatchSequence appends dst with document sequence of batches as long as the limits of max count, max
// document size, or total size allows. It returns the number of batches appended, the new appended slice, and
// any error raised. It returns the origenal input slice if nothing can be appends within the limits.
func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
if b.End() {
if b.Size() == 0 {
return 0, dst, io.EOF
}
l := len(dst)
Expand All @@ -34,7 +37,7 @@ func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSiz
idx, dst = bsoncore.ReserveLength(dst)
dst = append(dst, b.Identifier...)
dst = append(dst, 0x00)
size := len(dst) - l
var size int
var n int
for i := b.offset; i < len(b.Documents); i++ {
if n == maxCount {
Expand All @@ -45,7 +48,7 @@ func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSiz
break
}
size += len(doc)
if size >= totalSize {
if size > maxDocSize {
break
}
dst = append(dst, doc...)
Expand All @@ -58,13 +61,16 @@ func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSiz
return n, dst, nil
}

// AppendBatchArray appends dst with array of batches as long as the limits of max count, max document size, or
// total size allows. It returns the number of batches appended, the new appended slice, and any error raised. It
// returns the origenal input slice if nothing can be appends within the limits.
func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
if b.End() {
if b.Size() == 0 {
return 0, dst, io.EOF
}
l := len(dst)
aidx, dst := bsoncore.AppendArrayElementStart(dst, b.Identifier)
size := len(dst) - l
var size int
var n int
for i := b.offset; i < len(b.Documents); i++ {
if n == maxCount {
Expand All @@ -75,7 +81,7 @@ func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, totalSize i
break
}
size += len(doc)
if size >= totalSize {
if size > maxDocSize {
break
}
dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(n), doc)
Expand All @@ -92,14 +98,23 @@ func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, totalSize i
return n, dst, nil
}

// IsOrdered indicates if the batches are ordered.
func (b *Batches) IsOrdered() *bool {
return b.Ordered
}

// AdvanceBatches advances the batches with the given input.
func (b *Batches) AdvanceBatches(n int) {
b.offset += n
if b.offset > len(b.Documents) {
b.offset = len(b.Documents)
}
}

func (b *Batches) End() bool {
return len(b.Documents) <= b.offset
// Size returns the size of batches remained.
func (b *Batches) Size() int {
if b.offset > len(b.Documents) {
return 0
}
return len(b.Documents) - b.offset
}
Loading

0 comments on commit e3dcc91

Please sign in to comment.