Skip to content

Commit

Permalink
Merge branch 'main' into REP-5358-retry-and-channels
Browse files Browse the repository at this point in the history
  • Loading branch information
FGasper committed Dec 6, 2024
2 parents fa63ff7 + 709bceb commit ab10df3
Show file tree
Hide file tree
Showing 16 changed files with 582 additions and 338 deletions.
261 changes: 136 additions & 125 deletions internal/partitions/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ const (
func PartitionCollectionWithSize(
ctx context.Context,
uuidEntry *uuidutil.NamespaceAndUUID,
retryer *retry.Retryer,
srcClient *mongo.Client,
replicatorList []Replicator,
subLogger *logger.Logger,
Expand All @@ -137,7 +136,6 @@ func PartitionCollectionWithSize(
partitions, docCount, byteCount, err := PartitionCollectionWithParameters(
ctx,
uuidEntry,
retryer,
srcClient,
replicatorList,
defaultSampleRate,
Expand All @@ -153,7 +151,6 @@ func PartitionCollectionWithSize(
return PartitionCollectionWithParameters(
ctx,
uuidEntry,
retryer,
srcClient,
replicatorList,
defaultSampleRate,
Expand All @@ -174,7 +171,6 @@ func PartitionCollectionWithSize(
func PartitionCollectionWithParameters(
ctx context.Context,
uuidEntry *uuidutil.NamespaceAndUUID,
retryer *retry.Retryer,
srcClient *mongo.Client,
replicatorList []Replicator,
sampleRate float64,
Expand All @@ -191,13 +187,13 @@ func PartitionCollectionWithParameters(

// Get the collection's size in bytes and its document count. It is okay if these return zero since there might still be
// items in the collection. Rely on getOuterIDBound to do a majority read to determine if we continue processing the collection.
collSizeInBytes, collDocCount, isCapped, err := GetSizeAndDocumentCount(ctx, subLogger, retryer, srcColl)
collSizeInBytes, collDocCount, isCapped, err := GetSizeAndDocumentCount(ctx, subLogger, srcColl)
if err != nil {
return nil, 0, 0, err
}

// The lower bound for the collection. There is no partitioning to do if the bound is nil.
minIDBound, err := getOuterIDBound(ctx, subLogger, retryer, minBound, srcDB, uuidEntry.CollName, globalFilter)
minIDBound, err := getOuterIDBound(ctx, subLogger, minBound, srcDB, uuidEntry.CollName, globalFilter)
if err != nil {
return nil, 0, 0, err
}
Expand All @@ -210,7 +206,7 @@ func PartitionCollectionWithParameters(
}

// The upper bound for the collection. There is no partitioning to do if the bound is nil.
maxIDBound, err := getOuterIDBound(ctx, subLogger, retryer, maxBound, srcDB, uuidEntry.CollName, globalFilter)
maxIDBound, err := getOuterIDBound(ctx, subLogger, maxBound, srcDB, uuidEntry.CollName, globalFilter)
if err != nil {
return nil, 0, 0, err
}
Expand All @@ -232,7 +228,7 @@ func PartitionCollectionWithParameters(

// If a filter is used for partitioning, number of partitions is calculated with the ratio of filtered documents.
if len(globalFilter) > 0 {
numFilteredDocs, filteredCntErr := GetDocumentCountAfterFiltering(ctx, subLogger, retryer, srcColl, globalFilter)
numFilteredDocs, filteredCntErr := GetDocumentCountAfterFiltering(ctx, subLogger, srcColl, globalFilter)
if filteredCntErr == nil {
numPartitions = getNumPartitions(collSizeInBytes, partitionSizeInBytes, float64(numFilteredDocs)/float64(collDocCount))
} else {
Expand All @@ -251,7 +247,6 @@ func PartitionCollectionWithParameters(
midIDBounds, collDropped, err := getMidIDBounds(
ctx,
subLogger,
retryer,
srcDB,
uuidEntry.CollName,
collDocCount,
Expand Down Expand Up @@ -314,7 +309,7 @@ func PartitionCollectionWithParameters(
// capped status, in that order.
//
// Exported for usage in integration tests.
func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection) (int64, int64, bool, error) {
func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, srcColl *mongo.Collection) (int64, int64, bool, error) {
srcDB := srcColl.Database()
collName := srcColl.Name()

Expand All @@ -324,39 +319,43 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
Capped bool `bson:"capped"`
}{}

err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collName, "Retrieving collection size and document count.")
request := bson.D{
{"aggregate", collName},
{"pipeline", mongo.Pipeline{
bson.D{{"$collStats", bson.D{
{"storageStats", bson.E{"scale", 1}},
}}},
// The "$group" here behaves as a project and rename when there's only one
// document (non-sharded case). When there are multiple documents (one for
// each shard) it correctly sums the counts and sizes from each shard.
bson.D{{"$group", bson.D{
{"_id", "ns"},
{"count", bson.D{{"$sum", "$storageStats.count"}}},
{"size", bson.D{{"$sum", "$storageStats.size"}}},
{"capped", bson.D{{"$first", "$capped"}}}}}},
}},
{"cursor", bson.D{}},
}
err := retry.New().WithCallback(
func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collName, "Retrieving collection size and document count.")
request := bson.D{
{"aggregate", collName},
{"pipeline", mongo.Pipeline{
bson.D{{"$collStats", bson.D{
{"storageStats", bson.E{"scale", 1}},
}}},
// The "$group" here behaves as a project and rename when there's only one
// document (non-sharded case). When there are multiple documents (one for
// each shard) it correctly sums the counts and sizes from each shard.
bson.D{{"$group", bson.D{
{"_id", "ns"},
{"count", bson.D{{"$sum", "$storageStats.count"}}},
{"size", bson.D{{"$sum", "$storageStats.size"}}},
{"capped", bson.D{{"$first", "$capped"}}}}}},
}},
{"cursor", bson.D{}},
}

cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
if driverErr != nil {
return driverErr
}
cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
if driverErr != nil {
return driverErr
}

defer cursor.Close(ctx)
if cursor.Next(ctx) {
if err := cursor.Decode(&value); err != nil {
return errors.Wrapf(err, "failed to decode $collStats response for source namespace %s.%s", srcDB.Name(), collName)
defer cursor.Close(ctx)
if cursor.Next(ctx) {
if err := cursor.Decode(&value); err != nil {
return errors.Wrapf(err, "failed to decode $collStats response for source namespace %s.%s", srcDB.Name(), collName)
}
}
}
return nil
})
return nil
},
"retrieving %#q's statistics",
srcDB.Name()+"."+collName,
).Run(ctx, logger)

// TODO (REP-960): remove this check.
// If we get NamespaceNotFoundError then return 0,0 since we won't do any partitioning with those returns
Expand All @@ -380,7 +379,7 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
//
// This function could take a long time, especially if the collection does not have an index
// on the filtered fields.
func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection, filter map[string]any) (int64, error) {
func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, srcColl *mongo.Collection, filter map[string]any) (int64, error) {
srcDB := srcColl.Database()
collName := srcColl.Name()

Expand All @@ -395,27 +394,31 @@ func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger,
}
pipeline = append(pipeline, bson.D{{"$count", "numFilteredDocs"}})

err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "count", "source", srcDB.Name(), collName, "Counting filtered documents.")
request := bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"cursor", bson.D{}},
}
err := retry.New().WithCallback(
func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "count", "source", srcDB.Name(), collName, "Counting filtered documents.")
request := bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"cursor", bson.D{}},
}

cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
if driverErr != nil {
return driverErr
}
cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
if driverErr != nil {
return driverErr
}

defer cursor.Close(ctx)
if cursor.Next(ctx) {
if err := cursor.Decode(&value); err != nil {
return errors.Wrapf(err, "failed to decode $count response (%+v) for source namespace %s.%s after filter (%+v)", cursor.Current, srcDB.Name(), collName, filter)
defer cursor.Close(ctx)
if cursor.Next(ctx) {
if err := cursor.Decode(&value); err != nil {
return errors.Wrapf(err, "failed to decode $count response (%+v) for source namespace %s.%s after filter (%+v)", cursor.Current, srcDB.Name(), collName, filter)
}
}
}
return nil
})
return nil
},
"counting %#q's filtered documents",
srcDB.Name()+"."+collName,
).Run(ctx, logger)

// TODO (REP-960): remove this check.
// If we get NamespaceNotFoundError then return 0 since we won't do any partitioning with those returns
Expand Down Expand Up @@ -458,7 +461,6 @@ func getNumPartitions(collSizeInBytes, partitionSizeInBytes int64, filteredRatio
func getOuterIDBound(
ctx context.Context,
subLogger *logger.Logger,
retryer *retry.Retryer,
minOrMaxBound minOrMaxBound,
srcDB *mongo.Database,
collName string,
Expand Down Expand Up @@ -488,30 +490,35 @@ func getOuterIDBound(
}...)

// Get one document containing only the smallest or largest _id value in the collection.
err := retryer.Run(ctx, subLogger, func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(subLogger.Logger, "aggregate", "source", srcDB.Name(), collName, fmt.Sprintf("getting %s _id partition bound", minOrMaxBound))
cursor, cmdErr :=
srcDB.RunCommandCursor(ctx, bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"hint", bson.D{{"_id", 1}}},
{"cursor", bson.D{}},
})

if cmdErr != nil {
return cmdErr
}
err := retry.New().WithCallback(
func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(subLogger.Logger, "aggregate", "source", srcDB.Name(), collName, fmt.Sprintf("getting %s _id partition bound", minOrMaxBound))
cursor, cmdErr :=
srcDB.RunCommandCursor(ctx, bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"hint", bson.D{{"_id", 1}}},
{"cursor", bson.D{}},
})

if cmdErr != nil {
return cmdErr
}

// If we don't have at least one document, the collection is either empty or was dropped.
defer cursor.Close(ctx)
if !cursor.Next(ctx) {
return nil
}
// If we don't have at least one document, the collection is either empty or was dropped.
defer cursor.Close(ctx)
if !cursor.Next(ctx) {
return nil
}

// Return the _id value from that document.
docID, cmdErr = cursor.Current.LookupErr("_id")
return cmdErr
})
// Return the _id value from that document.
docID, cmdErr = cursor.Current.LookupErr("_id")
return cmdErr
},
"finding %#q's %s _id",
srcDB.Name()+"."+collName,
minOrMaxBound,
).Run(ctx, subLogger)

if err != nil {
return nil, errors.Wrapf(err, "could not get %s _id bound for source collection '%s.%s'", minOrMaxBound, srcDB.Name(), collName)
Expand All @@ -528,7 +535,6 @@ func getOuterIDBound(
func getMidIDBounds(
ctx context.Context,
logger *logger.Logger,
retryer *retry.Retryer,
srcDB *mongo.Database,
collName string,
collDocCount int64,
Expand Down Expand Up @@ -576,48 +582,53 @@ func getMidIDBounds(

// Get a cursor for the $sample and $bucketAuto aggregation.
var midIDBounds []interface{}
agRetryer := retryer.WithErrorCodes(util.SampleTooManyDuplicates)
err := agRetryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.")
cursor, cmdErr :=
srcDB.RunCommandCursor(ctx, bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"allowDiskUse", true},
{"cursor", bson.D{}},
})

if cmdErr != nil {
return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s'", srcDB.Name(), collName)
}

defer cursor.Close(ctx)

// Iterate through all $bucketAuto documents of the form:
// {
// "_id" : {
// "min" : ... ,
// "max" : ...
// },
// "count" : ...
// }
midIDBounds = make([]interface{}, 0, numPartitions)
for cursor.Next(ctx) {
// Get a mid _id bound using the $bucketAuto document's max value.
bucketAutoDoc := make(bson.Raw, len(cursor.Current))
copy(bucketAutoDoc, cursor.Current)
bound, err := bucketAutoDoc.LookupErr("_id", "max")
if err != nil {
return errors.Wrapf(err, "failed to lookup '_id.max' key in $bucketAuto document for source namespace '%s.%s'", srcDB.Name(), collName)
}

// Append the copied bound to the other mid _id bounds.
midIDBounds = append(midIDBounds, bound)
ri.NoteSuccess()
}

return cursor.Err()
})
agRetryer := retry.New().WithErrorCodes(util.SampleTooManyDuplicates)
err := agRetryer.
WithCallback(
func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.")
cursor, cmdErr :=
srcDB.RunCommandCursor(ctx, bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"allowDiskUse", true},
{"cursor", bson.D{}},
})

if cmdErr != nil {
return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s'", srcDB.Name(), collName)
}

defer cursor.Close(ctx)

// Iterate through all $bucketAuto documents of the form:
// {
// "_id" : {
// "min" : ... ,
// "max" : ...
// },
// "count" : ...
// }
midIDBounds = make([]interface{}, 0, numPartitions)
for cursor.Next(ctx) {
// Get a mid _id bound using the $bucketAuto document's max value.
bucketAutoDoc := make(bson.Raw, len(cursor.Current))
copy(bucketAutoDoc, cursor.Current)
bound, err := bucketAutoDoc.LookupErr("_id", "max")
if err != nil {
return errors.Wrapf(err, "failed to lookup '_id.max' key in $bucketAuto document for source namespace '%s.%s'", srcDB.Name(), collName)
}

// Append the copied bound to the other mid _id bounds.
midIDBounds = append(midIDBounds, bound)
ri.NoteSuccess()
}

return cursor.Err()
},
"finding %#q's _id partition boundaries",
srcDB.Name()+"."+collName,
).Run(ctx, logger)

if err != nil {
return nil, false, errors.Wrapf(err, "encountered a problem in the cursor when trying to $sample and $bucketAuto aggregation for source namespace '%s.%s'", srcDB.Name(), collName)
Expand Down
Loading

0 comments on commit ab10df3

Please sign in to comment.