Skip to content

Commit

Permalink
RavenDB-17793 : pass prefix to StartBucketMigrationCommand so that we…
Browse files Browse the repository at this point in the history
… can find the prefix setting by name (sorted) instead of searching by bucket range start (needs to be ordered first)
  • Loading branch information
aviv committed Feb 1, 2024
1 parent a39912e commit 22fbdee
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ public sealed class PrefixedShardingSetting

private byte[] _prefixBytesLowerCase;

public PrefixedShardingSetting()
{
}

public PrefixedShardingSetting(string prefix)
{
Prefix = prefix;
}

internal byte[] PrefixBytesLowerCase => _prefixBytesLowerCase ??= Encoding.UTF8.GetBytes(Prefix.ToLower());

public List<int> Shards { get; set; }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using System.Collections.Generic;
using System.Linq;
using Raven.Client.Documents.Subscriptions;
using Raven.Client.Json.Serialization;
using Raven.Client.ServerWide;
using Raven.Client.ServerWide.Sharding;
using Raven.Server.Rachis;
using Raven.Server.ServerWide.Context;
using Raven.Server.ServerWide.Sharding;
using Raven.Server.Utils;
using Sparrow.Json.Parsing;
using Sparrow.Logging;
Expand All @@ -19,20 +19,22 @@ public sealed class StartBucketMigrationCommand : UpdateDatabaseCommand
public int? SourceShard;
public int DestinationShard;
public int Bucket;
public string Prefix;

private ShardBucketMigration _migration;

public StartBucketMigrationCommand()
{
}

public StartBucketMigrationCommand(int bucket, int destShard, string database, string raftId) : base(database, raftId)
public StartBucketMigrationCommand(int bucket, int destShard, string database, string prefix, string raftId) : base(database, raftId)
{
Bucket = bucket;
DestinationShard = destShard;
Prefix = prefix;
}

public StartBucketMigrationCommand(int bucket, int sourceShard, int destShard, string database, string raftId) : this(bucket, destShard, database, raftId)
public StartBucketMigrationCommand(int bucket, int sourceShard, int destShard, string database, string raftId) : this(bucket, destShard, database, prefix: null, raftId)
{
SourceShard = sourceShard;
}
Expand All @@ -56,27 +58,14 @@ public override void UpdateDatabaseRecord(DatabaseRecord record, long etag)
}
}


if (Bucket >= ShardHelper.NumberOfBuckets)
if (string.IsNullOrEmpty(Prefix) == false)
{
// prefixed bucket range
// todo
var prefixed = record.Sharding.Prefixed.OrderBy(x => x.BucketRangeStart).ToList();
List<int> shards = null;
for (int i = 0; i < prefixed.Count; i++)
{
var bucketRangeStart = prefixed[i].BucketRangeStart;
int nextBucketRangeStart = i == prefixed.Count - 1
? int.MaxValue
: prefixed[i + 1].BucketRangeStart;

if (Bucket < bucketRangeStart || Bucket >= nextBucketRangeStart)
continue;

shards = prefixed[i].Shards;
break;
}
var index = record.Sharding.Prefixed.BinarySearch(new PrefixedShardingSetting(Prefix), PrefixedSettingComparer.Instance);
if (index < 0)
throw new RachisApplyException($"Prefix {Prefix} doesn't exists");

var shards = record.Sharding.Prefixed[index].Shards;
if (shards == null || shards.Contains(DestinationShard) == false)
throw new RachisApplyException($"Destination shard {DestinationShard} doesn't exists");
}
Expand Down Expand Up @@ -145,6 +134,7 @@ public override void FillJson(DynamicJsonValue json)
json[nameof(SourceShard)] = SourceShard;
json[nameof(DestinationShard)] = DestinationShard;
json[nameof(Bucket)] = Bucket;
json[nameof(Prefix)] = Prefix;
}
}
}
3 changes: 1 addition & 2 deletions src/Raven.Server/ServerWide/JsonDeserializationCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,7 @@ internal sealed class JsonDeserializationCluster : JsonDeserializationBase
[nameof(UpdateQueueSinkCommand)] = GenerateJsonDeserializationRoutine<UpdateQueueSinkCommand>(),
[nameof(UpdateQueueSinkProcessStateCommand)] = GenerateJsonDeserializationRoutine<UpdateQueueSinkProcessStateCommand>(),
[nameof(RemoveQueueSinkProcessStateCommand)] = GenerateJsonDeserializationRoutine<RemoveQueueSinkProcessStateCommand>(),
[nameof(UpdateResponsibleNodeForTasksCommand)] = GenerateJsonDeserializationRoutine<UpdateResponsibleNodeForTasksCommand>()

[nameof(UpdateResponsibleNodeForTasksCommand)] = GenerateJsonDeserializationRoutine<UpdateResponsibleNodeForTasksCommand>(),
[nameof(AddPrefixedSettingCommand)] = GenerateJsonDeserializationRoutine<AddPrefixedSettingCommand>(),
[nameof(DeletePrefixedSettingCommand)] = GenerateJsonDeserializationRoutine<DeletePrefixedSettingCommand>(),
[nameof(UpdatePrefixedSettingCommand)] = GenerateJsonDeserializationRoutine<UpdatePrefixedSettingCommand>()
Expand Down
4 changes: 2 additions & 2 deletions src/Raven.Server/ServerWide/ShardingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public ShardingStore([NotNull] ServerStore serverStore)
_serverStore = serverStore ?? throw new ArgumentNullException(nameof(serverStore));
}

public Task<(long Index, object Result)> StartBucketMigration(string database, int bucket, int toShard, string raftId = null)
public Task<(long Index, object Result)> StartBucketMigration(string database, int bucket, int toShard, string prefix = null, string raftId = null)
{
var cmd = new StartBucketMigrationCommand(bucket, toShard, database, raftId ?? RaftIdGenerator.NewId());
var cmd = new StartBucketMigrationCommand(bucket, toShard, database, prefix, raftId ?? RaftIdGenerator.NewId());
return _serverStore.SendToLeaderAsync(cmd);
}

Expand Down
21 changes: 17 additions & 4 deletions test/Tests.Infrastructure/RavenTestBase.ReshardingTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ public async Task<int> StartMovingShardForId(IDocumentStore store, string id, in

var record = await store.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(store.Database));
var bucket = _parent.Sharding.GetBucket(record.Sharding, id);
PrefixedShardingSetting prefixed = null;
PrefixedShardingSetting prefixedSetting = null;
foreach (var setting in record.Sharding.Prefixed)
{
if (id.StartsWith(setting.Prefix, StringComparison.OrdinalIgnoreCase))
{
prefixed = setting;
prefixedSetting = setting;
break;
}
}

var shardNumber = ShardHelper.GetShardNumberFor(record.Sharding, bucket);
var shards = prefixed != null ? prefixed.Shards : record.Sharding.Shards.Keys.ToList();
var shards = prefixedSetting != null ? prefixedSetting.Shards : record.Sharding.Shards.Keys.ToList();

int moveToShard;
if (toShard.HasValue)
Expand All @@ -65,7 +65,7 @@ public async Task<int> StartMovingShardForId(IDocumentStore store, string id, in
{
try
{
await server.ServerStore.Sharding.StartBucketMigration(store.Database, bucket, moveToShard);
await server.ServerStore.Sharding.StartBucketMigration(store.Database, bucket, moveToShard, prefixedSetting?.Prefix);
break;
}
catch
Expand Down Expand Up @@ -93,6 +93,19 @@ record = await store.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation
}
}

public async Task WaitForNoActiveMigrations(IDocumentStore store, int timeout = 60_000)
{
using (var cts = new CancellationTokenSource(timeout))
{
var record = await store.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(store.Database), cts.Token);
while (record.Sharding.BucketMigrations.Count > 0)
{
await Task.Delay(250, cts.Token);
record = await store.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(store.Database), cts.Token);
}
}
}

public async Task MoveShardForId(IDocumentStore store, string id, int? toShard = null, List<RavenServer> servers = null)
{
try
Expand Down

0 comments on commit 22fbdee

Please sign in to comment.