diff --git a/src/Raven.Server/Documents/DatabasesLandlord.cs b/src/Raven.Server/Documents/DatabasesLandlord.cs index a9536aaab037..cf546e87a4a6 100644 --- a/src/Raven.Server/Documents/DatabasesLandlord.cs +++ b/src/Raven.Server/Documents/DatabasesLandlord.cs @@ -1259,7 +1259,7 @@ public enum ClusterDatabaseChangeType public bool UnloadDirectly(StringSegment databaseName, DateTime? wakeup = null, [CallerMemberName] string caller = null) { - var nextScheduledAction = new IdleDatabaseActivity(IdleDatabaseActivityType.WakeUpDatabase, wakeup); + var nextScheduledAction = new IdleDatabaseActivity(IdleDatabaseActivityType.WakeUpDatabase); return UnloadDirectly(databaseName, nextScheduledAction, caller); } @@ -1294,19 +1294,18 @@ public bool UnloadDirectly(StringSegment databaseName, IdleDatabaseActivity idle UnloadDatabaseInternal(databaseName.Value, caller); LastRecentlyUsed.TryRemove(databaseName, out _); - if (idleDatabaseActivity?.DueTime > 0) + // DateTime should be only null in tests + if (idleDatabaseActivity is { DateTime: not null }) _wakeupTimers.TryAdd(databaseName.Value, new Timer( callback: _ => NextScheduledActivityCallback(databaseName.Value, idleDatabaseActivity), state: null, - dueTime: idleDatabaseActivity.DueTime, + // in case the DueTime is negative or zero, the callback will be called immediately and database will be loaded. + dueTime: idleDatabaseActivity.DueTime > 0 ? idleDatabaseActivity.DueTime : 0, period: Timeout.Infinite)); if (_logger.IsOperationsEnabled) { - var msg = idleDatabaseActivity?.DueTime > 0 - ? $"wakeup timer set to: {idleDatabaseActivity.DateTime.Value}, which will happen in {idleDatabaseActivity.DueTime} ms." - : "without setting a wakeup timer."; - + var msg = idleDatabaseActivity == null ? "without setting a wakeup timer." : $"wakeup timer set to: '{idleDatabaseActivity.DateTime.GetValueOrDefault()}', which will happen in '{idleDatabaseActivity.DueTime}' ms."; _logger.Operations($"Unloading directly database '{databaseName}', {msg}"); } @@ -1395,12 +1394,13 @@ private void NextScheduledActivityCallback(string databaseName, IdleDatabaseActi if (ex is DatabaseConcurrentLoadTimeoutException e) { // database failed to load, retry after 1 min - ForTestingPurposes?.RescheduleDatabaseWakeupMre?.Set(); if (_logger.IsInfoEnabled) _logger.Info($"Failed to start database '{databaseName}' on timer, will retry the wakeup in '{_dueTimeOnRetry}' ms", e); nextIdleDatabaseActivity.DateTime = DateTime.UtcNow.AddMilliseconds(_dueTimeOnRetry); + ForTestingPurposes?.RescheduleDatabaseWakeupMre?.Set(); + RescheduleNextIdleDatabaseActivity(databaseName, nextIdleDatabaseActivity); } }, TaskContinuationOptions.OnlyOnFaulted); @@ -1649,14 +1649,24 @@ public sealed class IdleDatabaseActivity ? (int)Math.Min(int.MaxValue, (DateTime.Value - System.DateTime.UtcNow).TotalMilliseconds) : 0; - public IdleDatabaseActivity(IdleDatabaseActivityType type, DateTime? timeOfActivity, long taskId = 0, long lastEtag = 0) + public IdleDatabaseActivity(IdleDatabaseActivityType type) + { + LastEtag = 0; + Type = type; + TaskId = 0; + + // DateTime should be only null in tests + DateTime = null; + } + + public IdleDatabaseActivity(IdleDatabaseActivityType type, DateTime timeOfActivity, long taskId = 0, long lastEtag = 0) { LastEtag = lastEtag; Type = type; TaskId = taskId; - Debug.Assert(timeOfActivity?.Kind != DateTimeKind.Unspecified); - DateTime = timeOfActivity?.ToUniversalTime(); + Debug.Assert(timeOfActivity.Kind != DateTimeKind.Unspecified); + DateTime = timeOfActivity.ToUniversalTime(); } } diff --git a/src/Raven.Server/Documents/DocumentsStorage.cs b/src/Raven.Server/Documents/DocumentsStorage.cs index 5f5717cea1b0..99230f8a1953 100644 --- a/src/Raven.Server/Documents/DocumentsStorage.cs +++ b/src/Raven.Server/Documents/DocumentsStorage.cs @@ -1797,6 +1797,15 @@ public static Tombstone TableValueToTombstone(JsonOperationContext context, ref newFlags, nonPersistentFlags).Etag; + // We've to add notification since we're updating last tombstone etag, and we can end up in situation when our indexes will be stale due unprocessed tombstones after replication. + context.Transaction.AddAfterCommitNotification(new DocumentChange + { + Type = DocumentChangeTypes.Delete, + Id = id, + ChangeVector = changeVector, + CollectionName = collectionName.Name, + }); + return new DeleteOperationResult { Collection = collectionName, diff --git a/src/Raven.Server/Documents/Handlers/Processors/OngoingTasks/OngoingTasksHandlerProcessorForBackupDatabaseNow.cs b/src/Raven.Server/Documents/Handlers/Processors/OngoingTasks/OngoingTasksHandlerProcessorForBackupDatabaseNow.cs index d7b231ccb91c..c024560944d9 100644 --- a/src/Raven.Server/Documents/Handlers/Processors/OngoingTasks/OngoingTasksHandlerProcessorForBackupDatabaseNow.cs +++ b/src/Raven.Server/Documents/Handlers/Processors/OngoingTasks/OngoingTasksHandlerProcessorForBackupDatabaseNow.cs @@ -28,10 +28,11 @@ protected override async ValueTask ScheduleBackupOperationAsync(long taskI var nodeTag = RequestHandler.Database.PeriodicBackupRunner.WhoseTaskIsIt(taskId); if (nodeTag == null) { - // this can happen for a new task that was just created - // we'll wait for the cluster observer to determine the responsible node for the backup + // this can happen if the database was just created or if a new task that was just created + // we'll wait for the cluster observer to give more time for the database stats to become stable, + // and then we'll wait for the cluster observer to determine the responsible node for the backup - var task = Task.Delay(RequestHandler.Database.Configuration.Cluster.StabilizationTime.AsTimeSpan); + var task = Task.Delay(RequestHandler.Database.Configuration.Cluster.StabilizationTime.AsTimeSpan + RequestHandler.Database.Configuration.Cluster.StabilizationTime.AsTimeSpan); while (true) { diff --git a/test/FastTests/Client/Subscriptions/SubscriptionsBasic.cs b/test/FastTests/Client/Subscriptions/SubscriptionsBasic.cs index 75d75e49c0b1..e3f3ff30919f 100644 --- a/test/FastTests/Client/Subscriptions/SubscriptionsBasic.cs +++ b/test/FastTests/Client/Subscriptions/SubscriptionsBasic.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Raven.Client.Documents; using Raven.Client.Documents.Conventions; +using Raven.Client.Documents.Operations.OngoingTasks; using Raven.Client.Documents.Queries; using Raven.Client.Documents.Subscriptions; using Raven.Client.Exceptions.Documents.Subscriptions; @@ -1203,6 +1204,44 @@ public async Task Subscription_WhenProjectWithId_ShouldTranslateToJavascriptIdFu } } + [RavenFact(RavenTestCategory.Subscriptions)] + public async Task Subscription_GetOngoingTaskInfoOperation_ShouldReturnCorrentTaskStatus() + { + using var store = GetDocumentStore(); + + var entity = new User(); + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(entity); + await session.SaveChangesAsync(); + } + + var name = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions()); + + var state = await store.Subscriptions.GetSubscriptionStateAsync(name); + await using (var sub = store.Subscriptions.GetSubscriptionWorker(name)) + { + var mre = new AsyncManualResetEvent(); + var subscriptionTask = sub.Run(batch => + { + mre.Set(); + }); + var timeout = TimeSpan.FromSeconds(30); + Assert.True(await mre.WaitAsync(timeout)); + var taskInfoById = store.Maintenance.Send(new GetOngoingTaskInfoOperation(state.SubscriptionId, OngoingTaskType.Subscription)); + Assert.NotNull(taskInfoById); + Assert.Equal(OngoingTaskState.Enabled, taskInfoById.TaskState); + Assert.Equal(OngoingTaskType.Subscription, taskInfoById.TaskType); + Assert.Equal(OngoingTaskConnectionStatus.Active, taskInfoById.TaskConnectionStatus); + + var taskInfoByName = store.Maintenance.Send(new GetOngoingTaskInfoOperation(state.SubscriptionName, OngoingTaskType.Subscription)); + Assert.NotNull(taskInfoByName); + Assert.Equal(taskInfoById.TaskState, taskInfoByName.TaskState); + Assert.Equal(taskInfoById.TaskType, taskInfoByName.TaskType); + Assert.Equal(taskInfoById.TaskConnectionStatus, taskInfoByName.TaskConnectionStatus); + } + } + private class ProjectionObject { public string SomeProp { get; set; } diff --git a/test/SlowTests/Issues/RavenDB-18442.cs b/test/SlowTests/Issues/RavenDB-18442.cs index 0aeaee45ad2c..dd294c96a9f9 100644 --- a/test/SlowTests/Issues/RavenDB-18442.cs +++ b/test/SlowTests/Issues/RavenDB-18442.cs @@ -53,11 +53,7 @@ public async Task BackupInfoShouldBeInSyncBetweenClusterNodes() Backup.WaitForResponsibleNodeUpdateInCluster(store, nodes, result.TaskId); // Turn Database offline in second server. - Assert.Equal(1, WaitForValue(() => secondServer.ServerStore.IdleDatabases.Count, 1, timeout: 60000, interval: 1000)); //wait for db to be idle - var online = secondServer.ServerStore.DatabasesLandlord.DatabasesCache.TryGetValue(store.Database, out Task dbTask) && - dbTask != null && - dbTask.IsCompleted; - Assert.False(online); + secondServer.ServerStore.DatabasesLandlord.UnloadDirectly(store.Database); // Backup run in first server await Backup.RunBackupAsync(firstServer, result.TaskId, store); diff --git a/test/SlowTests/Issues/RavenDB-22040.cs b/test/SlowTests/Issues/RavenDB-22040.cs new file mode 100644 index 000000000000..4b4e09a4b1a0 --- /dev/null +++ b/test/SlowTests/Issues/RavenDB-22040.cs @@ -0,0 +1,82 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using FastTests.Server.Replication; +using Raven.Client.Documents.Indexes; +using Raven.Tests.Core.Utils.Entities; +using Tests.Infrastructure; +using Tests.Infrastructure.Entities; +using Xunit.Abstractions; + +namespace SlowTests.Issues; + +public class RavenDB_22040 : ReplicationTestBase +{ + public RavenDB_22040(ITestOutputHelper output) : base(output) + { + } + + [RavenFact(RavenTestCategory.Replication | RavenTestCategory.Indexes)] + public async Task ReplicatedTombstonesWillSendNotificationToIndexes() + { + var order1 = new Order() { Company = "Company1" }; + var order2 = new Order() { Company = "Company2" }; + var defaultTimeout = TimeSpan.FromSeconds(15); + + var cluster = await CreateRaftCluster(2); + + var store = GetDocumentStore(new Options { ReplicationFactor = 2, Server = cluster.Leader, }); + + var database = store.Database; + + var node1 = await cluster.Nodes[0].ServerStore.DatabasesLandlord.TryGetOrCreateResourceStore(database); + var node2 = await cluster.Nodes[1].ServerStore.DatabasesLandlord.TryGetOrCreateResourceStore(database); + + var t1 = await BreakReplication(cluster.Nodes[0].ServerStore, database); + var t2 = await BreakReplication(cluster.Nodes[1].ServerStore, database); + + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(order1); + await session.SaveChangesAsync(); + await new Index().ExecuteAsync(store); + Indexes.WaitForIndexing(store); + } + + t1.Mend(); + t2.Mend(); + + await WaitForDocumentInClusterAsync(cluster.Nodes, database, order1.Id, predicate: null, defaultTimeout); + + t1 = await BreakReplication(cluster.Nodes[0].ServerStore, database); + t2 = await BreakReplication(cluster.Nodes[1].ServerStore, database); + var markerDocument = new Employee() { FirstName = "MARKER" }; + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(order2); + await session.SaveChangesAsync(); + Indexes.WaitForIndexing(store); + + session.Delete(order2.Id); + await session.SaveChangesAsync(); + + await session.StoreAsync(markerDocument); + await session.SaveChangesAsync(); + } + + t1.Mend(); + t2.Mend(); + + await WaitForDocumentInClusterAsync(cluster.Nodes, database, markerDocument.Id, null, defaultTimeout); + Indexes.WaitForIndexing(store, database, timeout: defaultTimeout, nodeTag: "A"); + Indexes.WaitForIndexing(store, database, timeout: defaultTimeout, nodeTag: "B"); + } + + private class Index : AbstractIndexCreationTask + { + public Index() + { + Map = orders => orders.Select(x => new { x.Company }); + } + } +} diff --git a/test/StressTests/Issues/RavenDB_18420.cs b/test/StressTests/Issues/RavenDB_18420.cs index 0e02a9d0c002..09a4e602f8c5 100644 --- a/test/StressTests/Issues/RavenDB_18420.cs +++ b/test/StressTests/Issues/RavenDB_18420.cs @@ -60,7 +60,11 @@ public async Task ShouldRescheduleBackupTimerIfDocumentDatabaseFailedToLoad() var mre = server.ServerStore.DatabasesLandlord.ForTestingPurposesOnly().RescheduleDatabaseWakeupMre = new ManualResetEventSlim(); - // enable backup + // we wait here for UpdateResponsibleNodeForTasksCommand, it won't wake up the database since the db task is faulted with DatabaseConcurrentLoadTimeoutException + Backup.WaitForResponsibleNodeUpdate(server.ServerStore, store.Database, putConfiguration.TaskId); + Assert.Equal(1, server.ServerStore.IdleDatabases.Count); + + // enable backup and this will set wakeup timer await store.Maintenance.Server.SendAsync(new PutServerWideBackupConfigurationOperation(putConfiguration)); Assert.True(mre.Wait(TimeSpan.FromSeconds(65)));