Skip to content

Commit

Permalink
Merge branch 'v5.4' of github.com:ravendb/ravendb into v6.0
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/Raven.Server/Web/System/OngoingTasksHandler.cs
  • Loading branch information
arekpalinski committed Feb 9, 2024
2 parents 7873824 + b7f6d7a commit 5fb306f
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 20 deletions.
32 changes: 21 additions & 11 deletions src/Raven.Server/Documents/DatabasesLandlord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,7 @@ public enum ClusterDatabaseChangeType

public bool UnloadDirectly(StringSegment databaseName, DateTime? wakeup = null, [CallerMemberName] string caller = null)
{
var nextScheduledAction = new IdleDatabaseActivity(IdleDatabaseActivityType.WakeUpDatabase, wakeup);
var nextScheduledAction = new IdleDatabaseActivity(IdleDatabaseActivityType.WakeUpDatabase);
return UnloadDirectly(databaseName, nextScheduledAction, caller);
}

Expand Down Expand Up @@ -1294,19 +1294,18 @@ public bool UnloadDirectly(StringSegment databaseName, IdleDatabaseActivity idle
UnloadDatabaseInternal(databaseName.Value, caller);
LastRecentlyUsed.TryRemove(databaseName, out _);

if (idleDatabaseActivity?.DueTime > 0)
// DateTime should be only null in tests
if (idleDatabaseActivity is { DateTime: not null })
_wakeupTimers.TryAdd(databaseName.Value, new Timer(
callback: _ => NextScheduledActivityCallback(databaseName.Value, idleDatabaseActivity),
state: null,
dueTime: idleDatabaseActivity.DueTime,
// in case the DueTime is negative or zero, the callback will be called immediately and database will be loaded.
dueTime: idleDatabaseActivity.DueTime > 0 ? idleDatabaseActivity.DueTime : 0,
period: Timeout.Infinite));

if (_logger.IsOperationsEnabled)
{
var msg = idleDatabaseActivity?.DueTime > 0
? $"wakeup timer set to: {idleDatabaseActivity.DateTime.Value}, which will happen in {idleDatabaseActivity.DueTime} ms."
: "without setting a wakeup timer.";

var msg = idleDatabaseActivity == null ? "without setting a wakeup timer." : $"wakeup timer set to: '{idleDatabaseActivity.DateTime.GetValueOrDefault()}', which will happen in '{idleDatabaseActivity.DueTime}' ms.";
_logger.Operations($"Unloading directly database '{databaseName}', {msg}");
}

Expand Down Expand Up @@ -1395,12 +1394,13 @@ private void NextScheduledActivityCallback(string databaseName, IdleDatabaseActi
if (ex is DatabaseConcurrentLoadTimeoutException e)
{
// database failed to load, retry after 1 min
ForTestingPurposes?.RescheduleDatabaseWakeupMre?.Set();

if (_logger.IsInfoEnabled)
_logger.Info($"Failed to start database '{databaseName}' on timer, will retry the wakeup in '{_dueTimeOnRetry}' ms", e);

nextIdleDatabaseActivity.DateTime = DateTime.UtcNow.AddMilliseconds(_dueTimeOnRetry);
ForTestingPurposes?.RescheduleDatabaseWakeupMre?.Set();

RescheduleNextIdleDatabaseActivity(databaseName, nextIdleDatabaseActivity);
}
}, TaskContinuationOptions.OnlyOnFaulted);
Expand Down Expand Up @@ -1649,14 +1649,24 @@ public sealed class IdleDatabaseActivity
? (int)Math.Min(int.MaxValue, (DateTime.Value - System.DateTime.UtcNow).TotalMilliseconds)
: 0;

public IdleDatabaseActivity(IdleDatabaseActivityType type, DateTime? timeOfActivity, long taskId = 0, long lastEtag = 0)
public IdleDatabaseActivity(IdleDatabaseActivityType type)
{
LastEtag = 0;
Type = type;
TaskId = 0;

// DateTime should be only null in tests
DateTime = null;
}

public IdleDatabaseActivity(IdleDatabaseActivityType type, DateTime timeOfActivity, long taskId = 0, long lastEtag = 0)
{
LastEtag = lastEtag;
Type = type;
TaskId = taskId;

Debug.Assert(timeOfActivity?.Kind != DateTimeKind.Unspecified);
DateTime = timeOfActivity?.ToUniversalTime();
Debug.Assert(timeOfActivity.Kind != DateTimeKind.Unspecified);
DateTime = timeOfActivity.ToUniversalTime();
}
}

Expand Down
9 changes: 9 additions & 0 deletions src/Raven.Server/Documents/DocumentsStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,15 @@ public static Tombstone TableValueToTombstone(JsonOperationContext context, ref
newFlags,
nonPersistentFlags).Etag;

// We've to add notification since we're updating last tombstone etag, and we can end up in situation when our indexes will be stale due unprocessed tombstones after replication.
context.Transaction.AddAfterCommitNotification(new DocumentChange
{
Type = DocumentChangeTypes.Delete,
Id = id,
ChangeVector = changeVector,
CollectionName = collectionName.Name,
});

return new DeleteOperationResult
{
Collection = collectionName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ protected override async ValueTask<bool> ScheduleBackupOperationAsync(long taskI
var nodeTag = RequestHandler.Database.PeriodicBackupRunner.WhoseTaskIsIt(taskId);
if (nodeTag == null)
{
// this can happen for a new task that was just created
// we'll wait for the cluster observer to determine the responsible node for the backup
// this can happen if the database was just created or if a new task that was just created
// we'll wait for the cluster observer to give more time for the database stats to become stable,
// and then we'll wait for the cluster observer to determine the responsible node for the backup

var task = Task.Delay(RequestHandler.Database.Configuration.Cluster.StabilizationTime.AsTimeSpan);
var task = Task.Delay(RequestHandler.Database.Configuration.Cluster.StabilizationTime.AsTimeSpan + RequestHandler.Database.Configuration.Cluster.StabilizationTime.AsTimeSpan);

while (true)
{
Expand Down
39 changes: 39 additions & 0 deletions test/FastTests/Client/Subscriptions/SubscriptionsBasic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Raven.Client.Documents;
using Raven.Client.Documents.Conventions;
using Raven.Client.Documents.Operations.OngoingTasks;
using Raven.Client.Documents.Queries;
using Raven.Client.Documents.Subscriptions;
using Raven.Client.Exceptions.Documents.Subscriptions;
Expand Down Expand Up @@ -1203,6 +1204,44 @@ public async Task Subscription_WhenProjectWithId_ShouldTranslateToJavascriptIdFu
}
}

[RavenFact(RavenTestCategory.Subscriptions)]
public async Task Subscription_GetOngoingTaskInfoOperation_ShouldReturnCorrentTaskStatus()
{
using var store = GetDocumentStore();

var entity = new User();
using (var session = store.OpenAsyncSession())
{
await session.StoreAsync(entity);
await session.SaveChangesAsync();
}

var name = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions<User>());

var state = await store.Subscriptions.GetSubscriptionStateAsync(name);
await using (var sub = store.Subscriptions.GetSubscriptionWorker<ProjectionObject>(name))
{
var mre = new AsyncManualResetEvent();
var subscriptionTask = sub.Run(batch =>
{
mre.Set();
});
var timeout = TimeSpan.FromSeconds(30);
Assert.True(await mre.WaitAsync(timeout));
var taskInfoById = store.Maintenance.Send(new GetOngoingTaskInfoOperation(state.SubscriptionId, OngoingTaskType.Subscription));
Assert.NotNull(taskInfoById);
Assert.Equal(OngoingTaskState.Enabled, taskInfoById.TaskState);
Assert.Equal(OngoingTaskType.Subscription, taskInfoById.TaskType);
Assert.Equal(OngoingTaskConnectionStatus.Active, taskInfoById.TaskConnectionStatus);

var taskInfoByName = store.Maintenance.Send(new GetOngoingTaskInfoOperation(state.SubscriptionName, OngoingTaskType.Subscription));
Assert.NotNull(taskInfoByName);
Assert.Equal(taskInfoById.TaskState, taskInfoByName.TaskState);
Assert.Equal(taskInfoById.TaskType, taskInfoByName.TaskType);
Assert.Equal(taskInfoById.TaskConnectionStatus, taskInfoByName.TaskConnectionStatus);
}
}

private class ProjectionObject
{
public string SomeProp { get; set; }
Expand Down
6 changes: 1 addition & 5 deletions test/SlowTests/Issues/RavenDB-18442.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ public async Task BackupInfoShouldBeInSyncBetweenClusterNodes()
Backup.WaitForResponsibleNodeUpdateInCluster(store, nodes, result.TaskId);

// Turn Database offline in second server.
Assert.Equal(1, WaitForValue(() => secondServer.ServerStore.IdleDatabases.Count, 1, timeout: 60000, interval: 1000)); //wait for db to be idle
var online = secondServer.ServerStore.DatabasesLandlord.DatabasesCache.TryGetValue(store.Database, out Task<DocumentDatabase> dbTask) &&
dbTask != null &&
dbTask.IsCompleted;
Assert.False(online);
secondServer.ServerStore.DatabasesLandlord.UnloadDirectly(store.Database);

// Backup run in first server
await Backup.RunBackupAsync(firstServer, result.TaskId, store);
Expand Down
82 changes: 82 additions & 0 deletions test/SlowTests/Issues/RavenDB-22040.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using FastTests.Server.Replication;
using Raven.Client.Documents.Indexes;
using Raven.Tests.Core.Utils.Entities;
using Tests.Infrastructure;
using Tests.Infrastructure.Entities;
using Xunit.Abstractions;

namespace SlowTests.Issues;

public class RavenDB_22040 : ReplicationTestBase
{
public RavenDB_22040(ITestOutputHelper output) : base(output)
{
}

[RavenFact(RavenTestCategory.Replication | RavenTestCategory.Indexes)]
public async Task ReplicatedTombstonesWillSendNotificationToIndexes()
{
var order1 = new Order() { Company = "Company1" };
var order2 = new Order() { Company = "Company2" };
var defaultTimeout = TimeSpan.FromSeconds(15);

var cluster = await CreateRaftCluster(2);

var store = GetDocumentStore(new Options { ReplicationFactor = 2, Server = cluster.Leader, });

var database = store.Database;

var node1 = await cluster.Nodes[0].ServerStore.DatabasesLandlord.TryGetOrCreateResourceStore(database);
var node2 = await cluster.Nodes[1].ServerStore.DatabasesLandlord.TryGetOrCreateResourceStore(database);

var t1 = await BreakReplication(cluster.Nodes[0].ServerStore, database);
var t2 = await BreakReplication(cluster.Nodes[1].ServerStore, database);

using (var session = store.OpenAsyncSession())
{
await session.StoreAsync(order1);
await session.SaveChangesAsync();
await new Index().ExecuteAsync(store);
Indexes.WaitForIndexing(store);
}

t1.Mend();
t2.Mend();

await WaitForDocumentInClusterAsync<Order>(cluster.Nodes, database, order1.Id, predicate: null, defaultTimeout);

t1 = await BreakReplication(cluster.Nodes[0].ServerStore, database);
t2 = await BreakReplication(cluster.Nodes[1].ServerStore, database);
var markerDocument = new Employee() { FirstName = "MARKER" };
using (var session = store.OpenAsyncSession())
{
await session.StoreAsync(order2);
await session.SaveChangesAsync();
Indexes.WaitForIndexing(store);

session.Delete(order2.Id);
await session.SaveChangesAsync();

await session.StoreAsync(markerDocument);
await session.SaveChangesAsync();
}

t1.Mend();
t2.Mend();

await WaitForDocumentInClusterAsync<Employee>(cluster.Nodes, database, markerDocument.Id, null, defaultTimeout);
Indexes.WaitForIndexing(store, database, timeout: defaultTimeout, nodeTag: "A");
Indexes.WaitForIndexing(store, database, timeout: defaultTimeout, nodeTag: "B");
}

private class Index : AbstractIndexCreationTask<Order>
{
public Index()
{
Map = orders => orders.Select(x => new { x.Company });
}
}
}
6 changes: 5 additions & 1 deletion test/StressTests/Issues/RavenDB_18420.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ public async Task ShouldRescheduleBackupTimerIfDocumentDatabaseFailedToLoad()

var mre = server.ServerStore.DatabasesLandlord.ForTestingPurposesOnly().RescheduleDatabaseWakeupMre = new ManualResetEventSlim();

// enable backup
// we wait here for UpdateResponsibleNodeForTasksCommand, it won't wake up the database since the db task is faulted with DatabaseConcurrentLoadTimeoutException
Backup.WaitForResponsibleNodeUpdate(server.ServerStore, store.Database, putConfiguration.TaskId);
Assert.Equal(1, server.ServerStore.IdleDatabases.Count);

// enable backup and this will set wakeup timer
await store.Maintenance.Server.SendAsync(new PutServerWideBackupConfigurationOperation(putConfiguration));

Assert.True(mre.Wait(TimeSpan.FromSeconds(65)));
Expand Down

0 comments on commit 5fb306f

Please sign in to comment.