From 1ec89cf5266f21745e71dee8ed05fa29390195e8 Mon Sep 17 00:00:00 2001 From: egor Date: Wed, 31 Jan 2024 16:44:54 +0200 Subject: [PATCH 1/5] RavenDB-21541 - fix task connection status when getting subscription ongoing task by name --- .../Web/System/OngoingTasksHandler.cs | 2 +- .../Subscriptions/SubscriptionsBasic.cs | 39 +++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/src/Raven.Server/Web/System/OngoingTasksHandler.cs b/src/Raven.Server/Web/System/OngoingTasksHandler.cs index 0017ecfea399..5200f405dd0f 100644 --- a/src/Raven.Server/Web/System/OngoingTasksHandler.cs +++ b/src/Raven.Server/Web/System/OngoingTasksHandler.cs @@ -1355,7 +1355,7 @@ public async Task GetOngoingTaskInfo() { connectionStatus = OngoingTaskConnectionStatus.NotOnThisNode; } - else if (Database.SubscriptionStorage.TryGetRunningSubscriptionConnectionsState(key, out var connectionsState)) + else if (Database.SubscriptionStorage.TryGetRunningSubscriptionConnectionsState(subscriptionState.SubscriptionId, out var connectionsState)) { connectionStatus = connectionsState.IsSubscriptionActive() ? OngoingTaskConnectionStatus.Active : OngoingTaskConnectionStatus.NotActive; } diff --git a/test/FastTests/Client/Subscriptions/SubscriptionsBasic.cs b/test/FastTests/Client/Subscriptions/SubscriptionsBasic.cs index 457dd2d65869..85fb0e2aac68 100644 --- a/test/FastTests/Client/Subscriptions/SubscriptionsBasic.cs +++ b/test/FastTests/Client/Subscriptions/SubscriptionsBasic.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Raven.Client.Documents; using Raven.Client.Documents.Conventions; +using Raven.Client.Documents.Operations.OngoingTasks; using Raven.Client.Documents.Queries; using Raven.Client.Documents.Subscriptions; using Raven.Client.Exceptions.Documents.Subscriptions; @@ -1177,6 +1178,44 @@ public async Task Subscription_WhenProjectWithId_ShouldTranslateToJavascriptIdFu } } + [RavenFact(RavenTestCategory.Subscriptions)] + public async Task Subscription_GetOngoingTaskInfoOperation_ShouldReturnCorrentTaskStatus() + { + using var store = GetDocumentStore(); + + var entity = new User(); + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(entity); + await session.SaveChangesAsync(); + } + + var name = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions()); + + var state = await store.Subscriptions.GetSubscriptionStateAsync(name); + await using (var sub = store.Subscriptions.GetSubscriptionWorker(name)) + { + var mre = new AsyncManualResetEvent(); + var subscriptionTask = sub.Run(batch => + { + mre.Set(); + }); + var timeout = TimeSpan.FromSeconds(30); + Assert.True(await mre.WaitAsync(timeout)); + var taskInfoById = store.Maintenance.Send(new GetOngoingTaskInfoOperation(state.SubscriptionId, OngoingTaskType.Subscription)); + Assert.NotNull(taskInfoById); + Assert.Equal(OngoingTaskState.Enabled, taskInfoById.TaskState); + Assert.Equal(OngoingTaskType.Subscription, taskInfoById.TaskType); + Assert.Equal(OngoingTaskConnectionStatus.Active, taskInfoById.TaskConnectionStatus); + + var taskInfoByName = store.Maintenance.Send(new GetOngoingTaskInfoOperation(state.SubscriptionName, OngoingTaskType.Subscription)); + Assert.NotNull(taskInfoByName); + Assert.Equal(taskInfoById.TaskState, taskInfoByName.TaskState); + Assert.Equal(taskInfoById.TaskType, taskInfoByName.TaskType); + Assert.Equal(taskInfoById.TaskConnectionStatus, taskInfoByName.TaskConnectionStatus); + } + } + private class ProjectionObject { public string SomeProp { get; set; } From 4f7a9a2c8bcf6e93913cfb4bdda29713bca0c333 Mon Sep 17 00:00:00 2001 From: egor Date: Thu, 8 Feb 2024 15:16:10 +0200 Subject: [PATCH 2/5] RavenDB-22047: make sure to wakeup timer even on negative duetime --- src/Raven.Server/Documents/DatabasesLandlord.cs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/Raven.Server/Documents/DatabasesLandlord.cs b/src/Raven.Server/Documents/DatabasesLandlord.cs index 868de87f1947..8c2243425198 100644 --- a/src/Raven.Server/Documents/DatabasesLandlord.cs +++ b/src/Raven.Server/Documents/DatabasesLandlord.cs @@ -1043,19 +1043,17 @@ public bool UnloadDirectly(StringSegment databaseName, IdleDatabaseActivity idle UnloadDatabaseInternal(databaseName.Value, caller); LastRecentlyUsed.TryRemove(databaseName, out _); - if (idleDatabaseActivity?.DueTime > 0) + if (idleDatabaseActivity != null) _wakeupTimers.TryAdd(databaseName.Value, new Timer( callback: _ => NextScheduledActivityCallback(databaseName.Value, idleDatabaseActivity), state: null, - dueTime: idleDatabaseActivity.DueTime, + // in case the DueTime is negative or zero, the callback will be called immediately and database will be loaded. + dueTime: idleDatabaseActivity.DueTime > 0 ? idleDatabaseActivity.DueTime : 0, period: Timeout.Infinite)); if (_logger.IsOperationsEnabled) { - var msg = idleDatabaseActivity?.DueTime > 0 - ? $"wakeup timer set to: {idleDatabaseActivity.DateTime.Value}, which will happen in {idleDatabaseActivity.DueTime} ms." - : "without setting a wakeup timer."; - + var msg = idleDatabaseActivity == null ? "without setting a wakeup timer." : $"wakeup timer set to: '{idleDatabaseActivity.DateTime.GetValueOrDefault()}', which will happen in '{idleDatabaseActivity.DueTime}' ms."; _logger.Operations($"Unloading directly database '{databaseName}', {msg}"); } From e7f9b9f187c4230981031feff9c5a1a727258c99 Mon Sep 17 00:00:00 2001 From: egor Date: Thu, 8 Feb 2024 15:18:23 +0200 Subject: [PATCH 3/5] RavenDB-22047 - make sure to give enough timeout for the database stats to become stable before throwing on database backup --- .../Documents/DatabasesLandlord.cs | 21 ++++++++++++++----- .../Web/System/OngoingTasksHandler.cs | 7 ++++--- test/SlowTests/Issues/RavenDB-18442.cs | 6 +----- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/Raven.Server/Documents/DatabasesLandlord.cs b/src/Raven.Server/Documents/DatabasesLandlord.cs index 8c2243425198..2113c7eee837 100644 --- a/src/Raven.Server/Documents/DatabasesLandlord.cs +++ b/src/Raven.Server/Documents/DatabasesLandlord.cs @@ -1008,7 +1008,7 @@ public enum ClusterDatabaseChangeType public bool UnloadDirectly(StringSegment databaseName, DateTime? wakeup = null, [CallerMemberName] string caller = null) { - var nextScheduledAction = new IdleDatabaseActivity(IdleDatabaseActivityType.WakeUpDatabase, wakeup); + var nextScheduledAction = new IdleDatabaseActivity(IdleDatabaseActivityType.WakeUpDatabase); return UnloadDirectly(databaseName, nextScheduledAction, caller); } @@ -1043,7 +1043,8 @@ public bool UnloadDirectly(StringSegment databaseName, IdleDatabaseActivity idle UnloadDatabaseInternal(databaseName.Value, caller); LastRecentlyUsed.TryRemove(databaseName, out _); - if (idleDatabaseActivity != null) + // DateTime should be only null in tests + if (idleDatabaseActivity is { DateTime: not null }) _wakeupTimers.TryAdd(databaseName.Value, new Timer( callback: _ => NextScheduledActivityCallback(databaseName.Value, idleDatabaseActivity), state: null, @@ -1274,14 +1275,24 @@ public class IdleDatabaseActivity ? (int)Math.Min(int.MaxValue, (DateTime.Value - System.DateTime.UtcNow).TotalMilliseconds) : 0; - public IdleDatabaseActivity(IdleDatabaseActivityType type, DateTime? timeOfActivity, long taskId = 0, long lastEtag = 0) + public IdleDatabaseActivity(IdleDatabaseActivityType type) + { + LastEtag = 0; + Type = type; + TaskId = 0; + + // DateTime should be only null in tests + DateTime = null; + } + + public IdleDatabaseActivity(IdleDatabaseActivityType type, DateTime timeOfActivity, long taskId = 0, long lastEtag = 0) { LastEtag = lastEtag; Type = type; TaskId = taskId; - Debug.Assert(timeOfActivity?.Kind != DateTimeKind.Unspecified); - DateTime = timeOfActivity?.ToUniversalTime(); + Debug.Assert(timeOfActivity.Kind != DateTimeKind.Unspecified); + DateTime = timeOfActivity.ToUniversalTime(); } } diff --git a/src/Raven.Server/Web/System/OngoingTasksHandler.cs b/src/Raven.Server/Web/System/OngoingTasksHandler.cs index 5200f405dd0f..4d05fb14c9e5 100644 --- a/src/Raven.Server/Web/System/OngoingTasksHandler.cs +++ b/src/Raven.Server/Web/System/OngoingTasksHandler.cs @@ -398,10 +398,11 @@ public async Task BackupDatabase() var nodeTag = Database.PeriodicBackupRunner.WhoseTaskIsIt(taskId); if (nodeTag == null) { - // this can happen for a new task that was just created - // we'll wait for the cluster observer to determine the responsible node for the backup + // this can happen if the database was just created or if a new task that was just created + // we'll wait for the cluster observer to give more time for the database stats to become stable, + // and then we'll wait for the cluster observer to determine the responsible node for the backup - var task = Task.Delay(Database.Configuration.Cluster.StabilizationTime.AsTimeSpan); + var task = Task.Delay(Database.Configuration.Cluster.StabilizationTime.AsTimeSpan + Database.Configuration.Cluster.StabilizationTime.AsTimeSpan); while (true) { diff --git a/test/SlowTests/Issues/RavenDB-18442.cs b/test/SlowTests/Issues/RavenDB-18442.cs index 39ea9a36740b..f0565cdc1dda 100644 --- a/test/SlowTests/Issues/RavenDB-18442.cs +++ b/test/SlowTests/Issues/RavenDB-18442.cs @@ -51,11 +51,7 @@ public async Task BackupInfoShouldBeInSyncBetweenClusterNodes() Backup.WaitForResponsibleNodeUpdateInCluster(store, nodes, result.TaskId); // Turn Database offline in second server. - Assert.Equal(1, WaitForValue(() => secondServer.ServerStore.IdleDatabases.Count, 1, timeout: 60000, interval: 1000)); //wait for db to be idle - var online = secondServer.ServerStore.DatabasesLandlord.DatabasesCache.TryGetValue(store.Database, out Task dbTask) && - dbTask != null && - dbTask.IsCompleted; - Assert.False(online); + secondServer.ServerStore.DatabasesLandlord.UnloadDirectly(store.Database); // Backup run in first server await Backup.RunBackupAsync(firstServer, result.TaskId, store); From 4db5e71913e641ef2a5ade60228bae1253f8e761 Mon Sep 17 00:00:00 2001 From: egor Date: Thu, 8 Feb 2024 19:38:42 +0200 Subject: [PATCH 4/5] RavenDB-22047 - fix failing test --- src/Raven.Server/Documents/DatabasesLandlord.cs | 3 ++- test/StressTests/Issues/RavenDB_18420.cs | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Raven.Server/Documents/DatabasesLandlord.cs b/src/Raven.Server/Documents/DatabasesLandlord.cs index 2113c7eee837..77a71d23160b 100644 --- a/src/Raven.Server/Documents/DatabasesLandlord.cs +++ b/src/Raven.Server/Documents/DatabasesLandlord.cs @@ -1144,12 +1144,13 @@ private void NextScheduledActivityCallback(string databaseName, IdleDatabaseActi if (ex is DatabaseConcurrentLoadTimeoutException e) { // database failed to load, retry after 1 min - ForTestingPurposes?.RescheduleDatabaseWakeupMre?.Set(); if (_logger.IsInfoEnabled) _logger.Info($"Failed to start database '{databaseName}' on timer, will retry the wakeup in '{_dueTimeOnRetry}' ms", e); nextIdleDatabaseActivity.DateTime = DateTime.UtcNow.AddMilliseconds(_dueTimeOnRetry); + ForTestingPurposes?.RescheduleDatabaseWakeupMre?.Set(); + RescheduleNextIdleDatabaseActivity(databaseName, nextIdleDatabaseActivity); } }, TaskContinuationOptions.OnlyOnFaulted); diff --git a/test/StressTests/Issues/RavenDB_18420.cs b/test/StressTests/Issues/RavenDB_18420.cs index 0e02a9d0c002..09a4e602f8c5 100644 --- a/test/StressTests/Issues/RavenDB_18420.cs +++ b/test/StressTests/Issues/RavenDB_18420.cs @@ -60,7 +60,11 @@ public async Task ShouldRescheduleBackupTimerIfDocumentDatabaseFailedToLoad() var mre = server.ServerStore.DatabasesLandlord.ForTestingPurposesOnly().RescheduleDatabaseWakeupMre = new ManualResetEventSlim(); - // enable backup + // we wait here for UpdateResponsibleNodeForTasksCommand, it won't wake up the database since the db task is faulted with DatabaseConcurrentLoadTimeoutException + Backup.WaitForResponsibleNodeUpdate(server.ServerStore, store.Database, putConfiguration.TaskId); + Assert.Equal(1, server.ServerStore.IdleDatabases.Count); + + // enable backup and this will set wakeup timer await store.Maintenance.Server.SendAsync(new PutServerWideBackupConfigurationOperation(putConfiguration)); Assert.True(mre.Wait(TimeSpan.FromSeconds(65))); From b7f6d7a7cc5ec6a8ee9795d53dac8940329afb1a Mon Sep 17 00:00:00 2001 From: Maciej Aszyk Date: Wed, 7 Feb 2024 11:35:34 +0100 Subject: [PATCH 5/5] RavenDB-22040 Send a notification when adding a tombstone without an existing document to force indexes to run indexing batches and update etags. --- .../Documents/DocumentsStorage.cs | 9 ++ test/SlowTests/Issues/RavenDB-22040.cs | 82 +++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 test/SlowTests/Issues/RavenDB-22040.cs diff --git a/src/Raven.Server/Documents/DocumentsStorage.cs b/src/Raven.Server/Documents/DocumentsStorage.cs index 82a7510022cb..6c71fef6bc05 100644 --- a/src/Raven.Server/Documents/DocumentsStorage.cs +++ b/src/Raven.Server/Documents/DocumentsStorage.cs @@ -1909,6 +1909,15 @@ public static Tombstone TableValueToTombstone(JsonOperationContext context, ref documentFlags, nonPersistentFlags).Etag; + // We've to add notification since we're updating last tombstone etag, and we can end up in situation when our indexes will be stale due unprocessed tombstones after replication. + context.Transaction.AddAfterCommitNotification(new DocumentChange + { + Type = DocumentChangeTypes.Delete, + Id = id, + ChangeVector = changeVector, + CollectionName = collectionName.Name, + }); + return new DeleteOperationResult { Collection = collectionName, diff --git a/test/SlowTests/Issues/RavenDB-22040.cs b/test/SlowTests/Issues/RavenDB-22040.cs new file mode 100644 index 000000000000..4b4e09a4b1a0 --- /dev/null +++ b/test/SlowTests/Issues/RavenDB-22040.cs @@ -0,0 +1,82 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using FastTests.Server.Replication; +using Raven.Client.Documents.Indexes; +using Raven.Tests.Core.Utils.Entities; +using Tests.Infrastructure; +using Tests.Infrastructure.Entities; +using Xunit.Abstractions; + +namespace SlowTests.Issues; + +public class RavenDB_22040 : ReplicationTestBase +{ + public RavenDB_22040(ITestOutputHelper output) : base(output) + { + } + + [RavenFact(RavenTestCategory.Replication | RavenTestCategory.Indexes)] + public async Task ReplicatedTombstonesWillSendNotificationToIndexes() + { + var order1 = new Order() { Company = "Company1" }; + var order2 = new Order() { Company = "Company2" }; + var defaultTimeout = TimeSpan.FromSeconds(15); + + var cluster = await CreateRaftCluster(2); + + var store = GetDocumentStore(new Options { ReplicationFactor = 2, Server = cluster.Leader, }); + + var database = store.Database; + + var node1 = await cluster.Nodes[0].ServerStore.DatabasesLandlord.TryGetOrCreateResourceStore(database); + var node2 = await cluster.Nodes[1].ServerStore.DatabasesLandlord.TryGetOrCreateResourceStore(database); + + var t1 = await BreakReplication(cluster.Nodes[0].ServerStore, database); + var t2 = await BreakReplication(cluster.Nodes[1].ServerStore, database); + + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(order1); + await session.SaveChangesAsync(); + await new Index().ExecuteAsync(store); + Indexes.WaitForIndexing(store); + } + + t1.Mend(); + t2.Mend(); + + await WaitForDocumentInClusterAsync(cluster.Nodes, database, order1.Id, predicate: null, defaultTimeout); + + t1 = await BreakReplication(cluster.Nodes[0].ServerStore, database); + t2 = await BreakReplication(cluster.Nodes[1].ServerStore, database); + var markerDocument = new Employee() { FirstName = "MARKER" }; + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(order2); + await session.SaveChangesAsync(); + Indexes.WaitForIndexing(store); + + session.Delete(order2.Id); + await session.SaveChangesAsync(); + + await session.StoreAsync(markerDocument); + await session.SaveChangesAsync(); + } + + t1.Mend(); + t2.Mend(); + + await WaitForDocumentInClusterAsync(cluster.Nodes, database, markerDocument.Id, null, defaultTimeout); + Indexes.WaitForIndexing(store, database, timeout: defaultTimeout, nodeTag: "A"); + Indexes.WaitForIndexing(store, database, timeout: defaultTimeout, nodeTag: "B"); + } + + private class Index : AbstractIndexCreationTask + { + public Index() + { + Map = orders => orders.Select(x => new { x.Company }); + } + } +}