Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Sep 29, 2024
1 parent 08a06da commit 982b622
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 18 deletions.
35 changes: 20 additions & 15 deletions mongo/client_bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
}
resMap := make([]interface{}, len(bw.models))
insIDMap := make(map[int]interface{})
canRetry := true
for i, v := range bw.models {
var doc bsoncore.Document
var err error
Expand Down Expand Up @@ -94,6 +95,7 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
bw.client.bsonOpts,
bw.client.registry)
case *ClientUpdateManyModel:
canRetry = false
nsIdx = getNsIndex(model.Namespace)
if bw.result.UpdateResults == nil {
bw.result.UpdateResults = make(map[int64]ClientUpdateResult)
Expand Down Expand Up @@ -144,6 +146,7 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
bw.client.bsonOpts,
bw.client.registry)
case *ClientDeleteManyModel:
canRetry = false
nsIdx = getNsIndex(model.Namespace)
if bw.result.DeleteResults == nil {
bw.result.DeleteResults = make(map[int64]ClientDeleteResult)
Expand All @@ -168,24 +171,22 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
Documents: docs,
Ordered: bw.ordered,
}
retry := driver.RetryNone
if bw.client.retryWrites && canRetry {
retry = driver.RetryOncePerCommand
}
op := operation.NewCommandFn(bw.newCommand(nsList)).Batches(batches).
Session(bw.session).WriteConcern(bw.writeConcern).CommandMonitor(bw.client.monitor).
ServerSelector(bw.selector).ClusterClock(bw.client.clock).
Database("admin").
Database("admin").Type(driver.Write).RetryMode(retry).
Deployment(bw.client.deployment).Crypt(bw.client.cryptFLE).
ServerAPI(bw.client.serverAPI).Timeout(bw.client.timeout).
Logger(bw.client.logger).Authenticator(bw.client.authenticator).Name("bulkWrite")
opErr := op.Execute(ctx)
var wcErrs []*WriteConcernError
if opErr != nil {
if errors.Is(opErr, driver.ErrUnacknowledgedWrite) {
return nil
}
var writeErr driver.WriteCommandError
if errors.As(opErr, &writeErr) {
wcErr := convertDriverWriteConcernError(writeErr.WriteConcernError)
wcErrs = append(wcErrs, wcErr)
}
}
var res struct {
Ok bool
Expand All @@ -202,6 +203,9 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
Errmsg string
}
rawRes := op.Result()
if len(rawRes) == 0 {
return opErr
}
if err := bson.Unmarshal(rawRes, &res); err != nil {
return err
}
Expand All @@ -212,31 +216,32 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
bw.result.UpsertedCount = int64(res.NUpserted)
errors := make(map[int64]WriteError)
for i, cur := range res.Cursor.FirstBatch {
switch res := resMap[i].(type) {
switch r := resMap[i].(type) {
case map[int64]ClientDeleteResult:
if err := appendDeleteResult(cur, res, errors); err != nil {
if err := appendDeleteResult(cur, r, errors); err != nil {
return err
}
case map[int64]ClientInsertResult:
if err := appendInsertResult(cur, res, errors, insIDMap); err != nil {
if err := appendInsertResult(cur, r, errors, insIDMap); err != nil {
return err
}
case map[int64]ClientUpdateResult:
if err := appendUpdateResult(cur, res, errors); err != nil {
if err := appendUpdateResult(cur, r, errors); err != nil {
return err
}
}
}
if !res.Ok || res.NErrors > 0 || opErr != nil {
if opErr != nil {
return opErr
} else if !res.Ok || res.NErrors > 0 {
return ClientBulkWriteException{
TopLevelError: &WriteError{
Code: int(res.Code),
Message: res.Errmsg,
Raw: bson.Raw(rawRes),
},
WriteConcernErrors: wcErrs,
WriteErrors: errors,
PartialResult: &bw.result,
WriteErrors: errors,
PartialResult: &bw.result,
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion x/mongo/driver/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func (op Operation) Execute(ctx context.Context) error {
if conn != nil {
// If we are dealing with a sharded cluster, then mark the failed server
// as "deprioritized".
if desc := conn.Description; desc != nil && op.Deployment.Kind() == description.Sharded {
if op.Deployment.Kind() == description.Sharded {
deprioritizedServers = []description.Server{conn.Description()}
}

Expand Down
28 changes: 26 additions & 2 deletions x/mongo/driver/operation/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ type Command struct {
name string
authenticator driver.Authenticator
commandFn func([]byte, description.SelectedServer) ([]byte, error)
batches *driver.Batches
database string
deployment driver.Deployment
selector description.ServerSelector
writeConcern *writeconcern.WriteConcern
readPreference *readpref.ReadPref
clock *session.ClusterClock
retry *driver.RetryMode
opType driver.Type
batches *driver.Batches
session *session.Client
monitor *event.CommandMonitor
resultResponse bsoncore.Document
Expand Down Expand Up @@ -95,7 +97,6 @@ func (c *Command) Execute(ctx context.Context) error {

return driver.Operation{
CommandFn: c.commandFn,
Batches: c.batches,
ProcessResponseFn: func(info driver.ResponseInfo) error {
c.resultResponse = info.ServerResponse

Expand All @@ -113,6 +114,9 @@ func (c *Command) Execute(ctx context.Context) error {
},
Client: c.session,
Clock: c.clock,
RetryMode: c.retry,
Type: c.opType,
Batches: c.batches,
CommandMonitor: c.monitor,
Database: c.database,
Deployment: c.deployment,
Expand Down Expand Up @@ -168,6 +172,26 @@ func (c *Command) Batches(batches *driver.Batches) *Command {
return c
}

// RetryMode sets the RetryMode for this operation.
func (c *Command) RetryMode(retry driver.RetryMode) *Command {
if c == nil {
c = new(Command)
}

c.retry = &retry
return c
}

// Type sets the opType for this operation.
func (c *Command) Type(t driver.Type) *Command {
if c == nil {
c = new(Command)
}

c.opType = t
return c
}

// Database sets the database to run this operation against.
func (c *Command) Database(database string) *Command {
if c == nil {
Expand Down

0 comments on commit 982b622

Please sign in to comment.