From e67609852f3b48d95c49736a9a0baff8c10b2cd4 Mon Sep 17 00:00:00 2001 From: aviv Date: Tue, 18 Jun 2024 15:51:03 +0300 Subject: [PATCH] RavenDB-20325 : handle export and import of timeseries deleted ranges in v6.0 --- .../Documents/Smuggler/DatabaseItemType.cs | 5 +- .../Smuggler/DatabaseSmugglerOptions.cs | 3 +- .../Smuggler/SmugglerProgressBase.cs | 3 + .../Documents/Smuggler/SmugglerResult.cs | 10 + .../AbstractSmugglerHandlerProcessor.cs | 7 + .../Documents/Handlers/TimeSeriesHandler.cs | 41 ++- .../Smuggler/Documents/CsvStreamSource.cs | 5 + .../Documents/Data/ISmugglerDestination.cs | 6 +- .../Documents/Data/ISmugglerSource.cs | 2 + .../Smuggler/Documents/DatabaseDestination.cs | 38 ++- .../Smuggler/Documents/DatabaseSource.cs | 83 ++++++ .../TimeSeriesDeletedRangeIterationState.cs | 26 ++ .../Documents/MultiShardedDestination.cs | 9 + .../Smuggler/Documents/SmugglerBase.cs | 40 ++- .../Documents/SmugglerDocumentType.cs | 25 ++ .../Smuggler/Documents/StreamDestination.cs | 45 ++++ .../Smuggler/Documents/StreamSource.cs | 44 +++- .../PeriodicBackup/PeriodicBackupSlowTests.cs | 238 ++++++++++++++++++ 18 files changed, 612 insertions(+), 18 deletions(-) create mode 100644 src/Raven.Server/Smuggler/Documents/Iteration/TimeSeriesDeletedRangeIterationState.cs diff --git a/src/Raven.Client/Documents/Smuggler/DatabaseItemType.cs b/src/Raven.Client/Documents/Smuggler/DatabaseItemType.cs index afb4baed50c1..3c32ec219900 100644 --- a/src/Raven.Client/Documents/Smuggler/DatabaseItemType.cs +++ b/src/Raven.Client/Documents/Smuggler/DatabaseItemType.cs @@ -19,14 +19,13 @@ public enum DatabaseItemType LegacyAttachmentDeletions = 1 << 10, DatabaseRecord = 1 << 11, Unknown = 1 << 12, - Attachments = 1 << 14, CounterGroups = 1 << 15, Subscriptions = 1 << 16, CompareExchangeTombstones = 1 << 17, TimeSeries = 1 << 18, - - ReplicationHubCertificates = 1 << 19 + ReplicationHubCertificates = 1 << 19, + TimeSeriesDeletedRanges = 1 << 20 } [Flags] diff --git a/src/Raven.Client/Documents/Smuggler/DatabaseSmugglerOptions.cs b/src/Raven.Client/Documents/Smuggler/DatabaseSmugglerOptions.cs index ad6c2c923ae2..bce5ff3e321f 100644 --- a/src/Raven.Client/Documents/Smuggler/DatabaseSmugglerOptions.cs +++ b/src/Raven.Client/Documents/Smuggler/DatabaseSmugglerOptions.cs @@ -16,7 +16,8 @@ public class DatabaseSmugglerOptions : IDatabaseSmugglerOptions DatabaseItemType.Attachments | DatabaseItemType.CounterGroups | DatabaseItemType.Subscriptions | - DatabaseItemType.TimeSeries; + DatabaseItemType.TimeSeries | + DatabaseItemType.TimeSeriesDeletedRanges; public const DatabaseRecordItemType DefaultOperateOnDatabaseRecordTypes = DatabaseRecordItemType.Client | DatabaseRecordItemType.ConflictSolverConfig | diff --git a/src/Raven.Client/Documents/Smuggler/SmugglerProgressBase.cs b/src/Raven.Client/Documents/Smuggler/SmugglerProgressBase.cs index 61af125075dc..5729cad0d0f2 100644 --- a/src/Raven.Client/Documents/Smuggler/SmugglerProgressBase.cs +++ b/src/Raven.Client/Documents/Smuggler/SmugglerProgressBase.cs @@ -34,6 +34,8 @@ public abstract class SmugglerProgressBase : IOperationProgress public Counts CompareExchangeTombstones { get; set; } + public CountsWithSkippedCountAndLastEtag TimeSeriesDeletedRanges { get; set; } + public virtual DynamicJsonValue ToJson() { return new DynamicJsonValue(GetType()) @@ -51,6 +53,7 @@ public virtual DynamicJsonValue ToJson() [nameof(CompareExchangeTombstones)] = CompareExchangeTombstones.ToJson(), [nameof(TimeSeries)] = TimeSeries.ToJson(), [nameof(ReplicationHubCertificates)] = ReplicationHubCertificates.ToJson(), + [nameof(TimeSeriesDeletedRanges)] = TimeSeriesDeletedRanges.ToJson() }; } diff --git a/src/Raven.Client/Documents/Smuggler/SmugglerResult.cs b/src/Raven.Client/Documents/Smuggler/SmugglerResult.cs index bfc79cace3ff..8c2c6a78fb34 100644 --- a/src/Raven.Client/Documents/Smuggler/SmugglerResult.cs +++ b/src/Raven.Client/Documents/Smuggler/SmugglerResult.cs @@ -43,6 +43,7 @@ public SmugglerResult() Subscriptions = new Counts(); ReplicationHubCertificates = new Counts(); TimeSeries = new CountsWithSkippedCountAndLastEtag(); + TimeSeriesDeletedRanges = new CountsWithSkippedCountAndLastEtag(); _progress = new SmugglerProgress(this); } @@ -86,6 +87,11 @@ void IOperationResult.MergeWith(IOperationResult result) smugglerResult.TimeSeries.LastEtag = Math.Max(smugglerResult.TimeSeries.LastEtag, TimeSeries.LastEtag); + smugglerResult.TimeSeriesDeletedRanges.ReadCount += TimeSeriesDeletedRanges.ReadCount; + smugglerResult.TimeSeriesDeletedRanges.ErroredCount += TimeSeriesDeletedRanges.ErroredCount; + smugglerResult.TimeSeriesDeletedRanges.SizeInBytes += TimeSeriesDeletedRanges.SizeInBytes; + smugglerResult.TimeSeriesDeletedRanges.LastEtag = Math.Max(smugglerResult.TimeSeriesDeletedRanges.LastEtag, TimeSeriesDeletedRanges.LastEtag); + smugglerResult.Identities.ReadCount += Identities.ReadCount; smugglerResult.Identities.ErroredCount += Identities.ErroredCount; @@ -230,6 +236,7 @@ public SmugglerProgress(SmugglerResult result) Subscriptions = _result?.Subscriptions; TimeSeries = _result?.TimeSeries; ReplicationHubCertificates = _result?.ReplicationHubCertificates; + TimeSeriesDeletedRanges = _result?.TimeSeriesDeletedRanges; } internal string Message { get; set; } @@ -261,6 +268,9 @@ public long GetLastEtag() if (TimeSeries.LastEtag > lastEtag) lastEtag = TimeSeries.LastEtag; + if (TimeSeriesDeletedRanges.LastEtag > lastEtag) + lastEtag = TimeSeriesDeletedRanges.LastEtag; + return lastEtag; } diff --git a/src/Raven.Server/Documents/Handlers/Processors/Smuggler/AbstractSmugglerHandlerProcessor.cs b/src/Raven.Server/Documents/Handlers/Processors/Smuggler/AbstractSmugglerHandlerProcessor.cs index b78efb984b7d..9c9f6c8a9b03 100644 --- a/src/Raven.Server/Documents/Handlers/Processors/Smuggler/AbstractSmugglerHandlerProcessor.cs +++ b/src/Raven.Server/Documents/Handlers/Processors/Smuggler/AbstractSmugglerHandlerProcessor.cs @@ -33,6 +33,13 @@ internal void ApplyBackwardCompatibility(DatabaseSmugglerOptionsServerSide optio if (RequestRouter.TryGetClientVersion(HttpContext, out var version) == false) return; + if (version.Major == 5 && (version.Minor < 4 || version.Minor == 4 && version.Build < 200) && + options.OperateOnTypes.HasFlag(DatabaseItemType.TimeSeries)) + { + // version is older than 5.4.200 + options.OperateOnTypes |= DatabaseItemType.TimeSeriesDeletedRanges; + } + if (version.Major != RavenVersionAttribute.Instance.MajorVersion) return; diff --git a/src/Raven.Server/Documents/Handlers/TimeSeriesHandler.cs b/src/Raven.Server/Documents/Handlers/TimeSeriesHandler.cs index c31893ff160e..bfb584df9e13 100644 --- a/src/Raven.Server/Documents/Handlers/TimeSeriesHandler.cs +++ b/src/Raven.Server/Documents/Handlers/TimeSeriesHandler.cs @@ -275,6 +275,8 @@ public sealed class SmugglerTimeSeriesBatchCommand : MergedTransactionCommand> _dictionary; + private readonly Dictionary> _deletedRanges; + private readonly DocumentsOperationContext _context; public DocumentsOperationContext Context => _context; @@ -292,7 +294,8 @@ public sealed class SmugglerTimeSeriesBatchCommand : MergedTransactionCommand>(); + _dictionary = new Dictionary>(StringComparer.OrdinalIgnoreCase); + _deletedRanges = new Dictionary>(StringComparer.OrdinalIgnoreCase); _toDispose = new(); _toReturn = new(); _releaseContext = _database.DocumentsStorage.ContextPool.AllocateOperationContext(out _context); @@ -304,6 +307,27 @@ protected override long ExecuteCmd(DocumentsOperationContext context) var changes = 0L; + foreach (var (docId, items) in _deletedRanges) + { + foreach (var item in items) + { + using (item) + { + var deletionRangeRequest = new TimeSeriesStorage.DeletionRangeRequest + { + DocumentId = docId, + Collection = item.Collection, + Name = item.Name, + From = item.From, + To = item.To + }; + tss.DeleteTimestampRange(context, deletionRangeRequest, remoteChangeVector: null, updateMetadata: false); + } + } + + changes += items.Count; + } + foreach (var (docId, items) in _dictionary) { var collectionName = _database.DocumentsStorage.ExtractCollectionName(context, items[0].Collection); @@ -346,6 +370,21 @@ public bool AddToDictionary(TimeSeriesItem item) return newItem; } + public bool AddToDeletedRanges(TimeSeriesDeletedRangeItemForSmuggler item) + { + bool newItem = false; + + if (_deletedRanges.TryGetValue(item.DocId, out var deletedRangesList) == false) + { + _deletedRanges[item.DocId] = deletedRangesList = []; + newItem = true; + } + + deletedRangesList.Add(item); + return newItem; + } + + public void AddToDisposal(IDisposable disposable) { _toDispose.Add(disposable); diff --git a/src/Raven.Server/Smuggler/Documents/CsvStreamSource.cs b/src/Raven.Server/Smuggler/Documents/CsvStreamSource.cs index 4d8f46de27ea..7b6561bd6737 100644 --- a/src/Raven.Server/Smuggler/Documents/CsvStreamSource.cs +++ b/src/Raven.Server/Smuggler/Documents/CsvStreamSource.cs @@ -443,6 +443,11 @@ public Stream GetAttachmentStream(LazyStringValue hash, out string tag) throw new NotSupportedException(); } + public IAsyncEnumerable GetTimeSeriesDeletedRangesAsync(ITimeSeriesActions action, List collectionsToExport) + { + return AsyncEnumerable.Empty(); + } + public void Dispose() { } diff --git a/src/Raven.Server/Smuggler/Documents/Data/ISmugglerDestination.cs b/src/Raven.Server/Smuggler/Documents/Data/ISmugglerDestination.cs index 86e7d6ede3a4..b7ef92f76024 100644 --- a/src/Raven.Server/Smuggler/Documents/Data/ISmugglerDestination.cs +++ b/src/Raven.Server/Smuggler/Documents/Data/ISmugglerDestination.cs @@ -47,6 +47,8 @@ public interface ISmugglerDestination ITimeSeriesActions TimeSeries(); + ITimeSeriesActions TimeSeriesDeletedRanges(); + ILegacyActions LegacyDocumentDeletions(); ILegacyActions LegacyAttachmentDeletions(); @@ -126,7 +128,9 @@ public interface IDatabaseRecordActions : IAsyncDisposable public interface ITimeSeriesActions : IAsyncDisposable, INewItemActions { ValueTask WriteTimeSeriesAsync(TimeSeriesItem ts); - + + ValueTask WriteTimeSeriesDeletedRangeAsync(TimeSeriesDeletedRangeItemForSmuggler deletedRange); + void RegisterForDisposal(IDisposable data); void RegisterForReturnToTheContext(AllocatedMemoryData data); diff --git a/src/Raven.Server/Smuggler/Documents/Data/ISmugglerSource.cs b/src/Raven.Server/Smuggler/Documents/Data/ISmugglerSource.cs index 0480c9247f8b..2ea9353b2e44 100644 --- a/src/Raven.Server/Smuggler/Documents/Data/ISmugglerSource.cs +++ b/src/Raven.Server/Smuggler/Documents/Data/ISmugglerSource.cs @@ -58,6 +58,8 @@ public interface ISmugglerSource SmugglerSourceType GetSourceType(); Stream GetAttachmentStream(LazyStringValue hash, out string tag); + + IAsyncEnumerable GetTimeSeriesDeletedRangesAsync(ITimeSeriesActions action, List collectionsToExport); } public enum SmugglerSourceType diff --git a/src/Raven.Server/Smuggler/Documents/DatabaseDestination.cs b/src/Raven.Server/Smuggler/Documents/DatabaseDestination.cs index d37a697c61be..35d4e3162856 100644 --- a/src/Raven.Server/Smuggler/Documents/DatabaseDestination.cs +++ b/src/Raven.Server/Smuggler/Documents/DatabaseDestination.cs @@ -151,6 +151,11 @@ public ITimeSeriesActions TimeSeries() return new TimeSeriesActions(_database); } + public ITimeSeriesActions TimeSeriesDeletedRanges() + { + return new TimeSeriesActions(_database); + } + public ILegacyActions LegacyDocumentDeletions() { // Used only in Stream Destination, needed when we writing from Stream Source to Stream Destination @@ -1299,7 +1304,7 @@ private sealed class TimeSeriesActions : ITimeSeriesActions private TimeSeriesHandler.SmugglerTimeSeriesBatchCommand _cmd; private TimeSeriesHandler.SmugglerTimeSeriesBatchCommand _prevCommand; private Task _prevCommandTask = Task.CompletedTask; - private Size _segmentsSize; + private Size _batchSize; private readonly Size _maxBatchSize; public TimeSeriesActions(DocumentDatabase database) @@ -1313,7 +1318,7 @@ public TimeSeriesActions(DocumentDatabase database) : 16, SizeUnit.Megabytes); - _segmentsSize = new Size(); + _batchSize = new Size(); } private void AddToBatch(TimeSeriesItem item) @@ -1324,9 +1329,22 @@ private void AddToBatch(TimeSeriesItem item) // be accounted for that if we look at segment size alone. So we assume that any new item means // updating the whole segment. This is especially important for encrypted databases, where we need // to keep all the modified data in memory in one shot - _segmentsSize.Add(2, SizeUnit.Kilobytes); + _batchSize.Add(2, SizeUnit.Kilobytes); } - _segmentsSize.Add(item.Segment.NumberOfBytes, SizeUnit.Bytes); + _batchSize.Add(item.Segment.NumberOfBytes, SizeUnit.Bytes); + } + + private void AddToBatch(TimeSeriesDeletedRangeItemForSmuggler item) + { + _cmd.AddToDeletedRanges(item); + + var size = item.Name.Size + + item.DocId.Size + + item.Collection.Size + + item.ChangeVector.Size + + 3 * sizeof(long); // From, To, Etag + + _batchSize.Add(size, SizeUnit.Bytes); } public async ValueTask DisposeAsync() @@ -1340,6 +1358,12 @@ public async ValueTask WriteTimeSeriesAsync(TimeSeriesItem ts) await HandleBatchOfTimeSeriesIfNecessaryAsync(); } + public async ValueTask WriteTimeSeriesDeletedRangeAsync(TimeSeriesDeletedRangeItemForSmuggler deletedRange) + { + AddToBatch(deletedRange); + await HandleBatchOfTimeSeriesIfNecessaryAsync(); + } + public void RegisterForDisposal(IDisposable data) { _cmd.AddToDisposal(data); @@ -1352,7 +1376,7 @@ public void RegisterForReturnToTheContext(AllocatedMemoryData data) private async ValueTask HandleBatchOfTimeSeriesIfNecessaryAsync() { - if (_segmentsSize < _maxBatchSize) + if (_batchSize < _maxBatchSize) return; var prevCommand = _prevCommand; @@ -1373,7 +1397,7 @@ private async ValueTask HandleBatchOfTimeSeriesIfNecessaryAsync() _cmd = new TimeSeriesHandler.SmugglerTimeSeriesBatchCommand(_database); - _segmentsSize.Set(0, SizeUnit.Bytes); + _batchSize.Set(0, SizeUnit.Bytes); } private async ValueTask FinishBatchOfTimeSeriesAsync() @@ -1388,7 +1412,7 @@ private async ValueTask FinishBatchOfTimeSeriesAsync() _prevCommand = null; } - if (_segmentsSize.GetValue(SizeUnit.Bytes) > 0) + if (_batchSize.GetValue(SizeUnit.Bytes) > 0) { await _database.TxMerger.Enqueue(_cmd); } diff --git a/src/Raven.Server/Smuggler/Documents/DatabaseSource.cs b/src/Raven.Server/Smuggler/Documents/DatabaseSource.cs index 7e253161a568..13d6689d5d2e 100644 --- a/src/Raven.Server/Smuggler/Documents/DatabaseSource.cs +++ b/src/Raven.Server/Smuggler/Documents/DatabaseSource.cs @@ -58,6 +58,7 @@ public class DatabaseSource : ISmugglerSource DatabaseItemType.CompareExchangeTombstones, DatabaseItemType.CounterGroups, DatabaseItemType.Subscriptions, + DatabaseItemType.TimeSeriesDeletedRanges, DatabaseItemType.TimeSeries, DatabaseItemType.ReplicationHubCertificates, DatabaseItemType.None @@ -601,5 +602,87 @@ public SmugglerSourceType GetSourceType() { return _type; } + + public IAsyncEnumerable GetTimeSeriesDeletedRangesAsync(ITimeSeriesActions action, List collectionsToExport) => + GetTimeSeriesDeletedRanges(collectionsToExport).ToAsyncEnumerable(); + + private IEnumerable GetTimeSeriesDeletedRanges(IEnumerable collectionsToExport) + { + Debug.Assert(_context != null); + + var initialState = new TimeSeriesDeletedRangeIterationState(_context, _database.Configuration.Databases.PulseReadTransactionLimit) + { + StartEtag = _startDocumentEtag, + StartEtagByCollection = collectionsToExport.ToDictionary(x => x, x => _startDocumentEtag) + }; + + var enumerator = new PulsedTransactionEnumerator(_context, + state => + { + if (state.StartEtagByCollection.Count != 0) + return GetTimeSeriesDeletedRangesFromCollections(_context, state); + return GetAlTimeSeriesDeletedRanges(_context, state.StartEtag); + }, initialState); + + while (enumerator.MoveNext()) + { + yield return enumerator.Current; + } + } + + private static IEnumerable GetAlTimeSeriesDeletedRanges(DocumentsOperationContext context, long startEtag) + { + var database = context.DocumentDatabase; + foreach (var deletedRange in database.DocumentsStorage.TimeSeriesStorage.GetDeletedRangesFrom(context, startEtag)) + { + using (deletedRange) + { + TimeSeriesValuesSegment.ParseTimeSeriesKey(deletedRange.Key, context, out var docId, out var name); + + yield return new TimeSeriesDeletedRangeItemForSmuggler + { + DocId = docId, + Name = name, + From = deletedRange.From, + To = deletedRange.To, + Collection = deletedRange.Collection.Clone(context), + ChangeVector = context.GetLazyString(deletedRange.ChangeVector), + Etag = deletedRange.Etag + }; + } + } + } + + private static IEnumerable GetTimeSeriesDeletedRangesFromCollections(DocumentsOperationContext context, TimeSeriesDeletedRangeIterationState state) + { + var database = context.DocumentDatabase; + var collections = state.StartEtagByCollection.Keys.ToList(); + + foreach (var collection in collections) + { + var etag = state.StartEtagByCollection[collection]; + + state.CurrentCollection = collection; + + foreach (var deletedRange in database.DocumentsStorage.TimeSeriesStorage.GetDeletedRangesFrom(context, collection, etag)) + { + using (deletedRange) + { + TimeSeriesValuesSegment.ParseTimeSeriesKey(deletedRange.Key, context, out var docId, out var name); + + yield return new TimeSeriesDeletedRangeItemForSmuggler + { + DocId = docId, + Name = name, + From = deletedRange.From, + To = deletedRange.To, + Collection = deletedRange.Collection.Clone(context), + ChangeVector = context.GetLazyString(deletedRange.ChangeVector), + Etag = deletedRange.Etag + }; + } + } + } + } } } diff --git a/src/Raven.Server/Smuggler/Documents/Iteration/TimeSeriesDeletedRangeIterationState.cs b/src/Raven.Server/Smuggler/Documents/Iteration/TimeSeriesDeletedRangeIterationState.cs new file mode 100644 index 000000000000..727d018d44ba --- /dev/null +++ b/src/Raven.Server/Smuggler/Documents/Iteration/TimeSeriesDeletedRangeIterationState.cs @@ -0,0 +1,26 @@ +using Raven.Server.ServerWide.Context; +using Sparrow; + +namespace Raven.Server.Smuggler.Documents.Iteration +{ + public sealed class TimeSeriesDeletedRangeIterationState : CollectionAwareIterationState + { + public TimeSeriesDeletedRangeIterationState(DocumentsOperationContext context, Size pulseLimit) : base(context, pulseLimit) + { + } + + public override void OnMoveNext(TimeSeriesDeletedRangeItemForSmuggler current) + { + if (StartEtagByCollection.Count != 0) + { + StartEtagByCollection[CurrentCollection] = current.Etag + 1; + } + else + { + StartEtag = current.Etag + 1; + } + + ReadCount++; + } + } +} diff --git a/src/Raven.Server/Smuggler/Documents/MultiShardedDestination.cs b/src/Raven.Server/Smuggler/Documents/MultiShardedDestination.cs index b4f25999898b..d77ebb2ea75f 100644 --- a/src/Raven.Server/Smuggler/Documents/MultiShardedDestination.cs +++ b/src/Raven.Server/Smuggler/Documents/MultiShardedDestination.cs @@ -137,6 +137,9 @@ public ICounterActions Counters(SmugglerResult result) => public ITimeSeriesActions TimeSeries() => new ShardedTimeSeriesActions(_databaseContext, _allocator, _destinations.ToDictionary(x => x.Key, x => x.Value.TimeSeries()), _options); + public ITimeSeriesActions TimeSeriesDeletedRanges() => + new ShardedTimeSeriesActions(_databaseContext, _allocator, _destinations.ToDictionary(x => x.Key, x => x.Value.TimeSeriesDeletedRanges()), _options); + public ILegacyActions LegacyDocumentDeletions() => new ShardedLegacyActions(_databaseContext, _allocator, _destinations.ToDictionary(x => x.Key, x => x.Value.LegacyDocumentDeletions()), _options); @@ -355,6 +358,12 @@ public async ValueTask WriteTimeSeriesAsync(TimeSeriesItem ts) await _actions[shardNumber].WriteTimeSeriesAsync(ts); } + public async ValueTask WriteTimeSeriesDeletedRangeAsync(TimeSeriesDeletedRangeItemForSmuggler deletedRange) + { + var shardNumber = DatabaseContext.GetShardNumberFor(_allocator, deletedRange.DocId); + await _actions[shardNumber].WriteTimeSeriesDeletedRangeAsync(deletedRange); + } + public void RegisterForDisposal(IDisposable data) { // TODO fix this after handling this properly in 5.4 diff --git a/src/Raven.Server/Smuggler/Documents/SmugglerBase.cs b/src/Raven.Server/Smuggler/Documents/SmugglerBase.cs index b0f3b5825fb1..b48bb9733ab4 100644 --- a/src/Raven.Server/Smuggler/Documents/SmugglerBase.cs +++ b/src/Raven.Server/Smuggler/Documents/SmugglerBase.cs @@ -18,7 +18,6 @@ using Raven.Server.Documents.Indexes.MapReduce.Auto; using Raven.Server.Documents.PeriodicBackup; using Raven.Server.Documents.Replication; -using Raven.Server.Routing; using Raven.Server.ServerWide.Commands; using Raven.Server.Smuggler.Documents.Data; using Raven.Server.Smuggler.Documents.Processors; @@ -253,6 +252,10 @@ protected virtual async Task ProcessTypeAsync(DatabaseItemType type, SmugglerRes counts = await ProcessTimeSeriesAsync(result); break; + case DatabaseItemType.TimeSeriesDeletedRanges: + counts = await ProcessTimeSeriesDeletedRangesAsync(result); + break; + default: throw new ArgumentOutOfRangeException(nameof(type), type, null); } @@ -347,6 +350,10 @@ internal async Task SkipTypeAsync(DatabaseItemType type, SmugglerResult result, counts = result.ReplicationHubCertificates; break; + case DatabaseItemType.TimeSeriesDeletedRanges: + counts = result.TimeSeriesDeletedRanges; + break; + default: throw new ArgumentOutOfRangeException(nameof(type), type, null); } @@ -958,6 +965,37 @@ static bool ShouldSkip(TimeSeriesItem ts, SmugglerPatcher patcher, bool isFullBa } } + protected virtual async Task ProcessTimeSeriesDeletedRangesAsync(SmugglerResult result) + { + result.TimeSeriesDeletedRanges.Start(); + + await using (var actions = _destination.TimeSeriesDeletedRanges()) + { + await foreach (var deletedRange in _source.GetTimeSeriesDeletedRangesAsync(actions, _options.Collections)) + { + _token.ThrowIfCancellationRequested(); + result.TimeSeriesDeletedRanges.ReadCount++; + + if (result.TimeSeriesDeletedRanges.ReadCount % 1000 == 0) + AddInfoToSmugglerResult(result, $"Time Series deleted ranges entries {result.TimeSeriesDeletedRanges}"); + + if (ShouldSkip(deletedRange, _patcher) == false) + await actions.WriteTimeSeriesDeletedRangeAsync(deletedRange); + else + result.TimeSeriesDeletedRanges.SkippedCount++; + + result.TimeSeriesDeletedRanges.LastEtag = deletedRange.Etag; + } + } + + return result.TimeSeriesDeletedRanges; + + static bool ShouldSkip(TimeSeriesDeletedRangeItemForSmuggler deletedRange, SmugglerPatcher patcher) + { + return patcher != null && patcher.ShouldSkip(deletedRange.DocId); + } + } + protected async Task ProcessDatabaseRecordInternalAsync(SmugglerResult result, IDatabaseRecordActions action) { result.DatabaseRecord.Start(); diff --git a/src/Raven.Server/Smuggler/Documents/SmugglerDocumentType.cs b/src/Raven.Server/Smuggler/Documents/SmugglerDocumentType.cs index eeb423b33584..a1afa5d6294c 100644 --- a/src/Raven.Server/Smuggler/Documents/SmugglerDocumentType.cs +++ b/src/Raven.Server/Smuggler/Documents/SmugglerDocumentType.cs @@ -88,4 +88,29 @@ public void Dispose() Collection?.Dispose(); } } + + public sealed class TimeSeriesDeletedRangeItemForSmuggler : IDisposable + { + public LazyStringValue DocId; + + public LazyStringValue Name; + + public LazyStringValue Collection; + + public LazyStringValue ChangeVector; + + public DateTime From; + + public DateTime To; + + public long Etag; + + public void Dispose() + { + DocId?.Dispose(); + Name?.Dispose(); + Collection?.Dispose(); + ChangeVector?.Dispose(); + } + } } diff --git a/src/Raven.Server/Smuggler/Documents/StreamDestination.cs b/src/Raven.Server/Smuggler/Documents/StreamDestination.cs index d75c8cc54531..c776eadaad23 100644 --- a/src/Raven.Server/Smuggler/Documents/StreamDestination.cs +++ b/src/Raven.Server/Smuggler/Documents/StreamDestination.cs @@ -214,6 +214,11 @@ public ITimeSeriesActions TimeSeries() return new StreamTimeSeriesActions(_writer, _context, nameof(DatabaseItemType.TimeSeries)); } + public ITimeSeriesActions TimeSeriesDeletedRanges() + { + return new StreamTimeSeriesActions(_writer, _context, nameof(DatabaseItemType.TimeSeriesDeletedRanges)); + } + public IIndexActions Indexes() { return new StreamIndexActions(_writer, _context); @@ -1217,6 +1222,46 @@ public async ValueTask WriteTimeSeriesAsync(TimeSeriesItem item) } } + public async ValueTask WriteTimeSeriesDeletedRangeAsync(TimeSeriesDeletedRangeItemForSmuggler deletedRangeItem) + { + using (deletedRangeItem) + { + if (First == false) + Writer.WriteComma(); + + First = false; + + Writer.WriteStartObject(); + + Writer.WritePropertyName(nameof(TimeSeriesDeletedRangeItemForSmuggler.DocId)); + Writer.WriteString(deletedRangeItem.DocId, skipEscaping: true); + Writer.WriteComma(); + + Writer.WritePropertyName(nameof(TimeSeriesDeletedRangeItemForSmuggler.Name)); + Writer.WriteString(deletedRangeItem.Name, skipEscaping: true); + Writer.WriteComma(); + + Writer.WritePropertyName(nameof(TimeSeriesDeletedRangeItemForSmuggler.Collection)); + Writer.WriteString(deletedRangeItem.Collection); + Writer.WriteComma(); + + Writer.WritePropertyName(nameof(TimeSeriesDeletedRangeItemForSmuggler.ChangeVector)); + Writer.WriteString(deletedRangeItem.ChangeVector); + Writer.WriteComma(); + + Writer.WritePropertyName(nameof(TimeSeriesDeletedRangeItemForSmuggler.From)); + Writer.WriteDateTime(deletedRangeItem.From, isUtc: true); + Writer.WriteComma(); + + Writer.WritePropertyName(nameof(TimeSeriesDeletedRangeItemForSmuggler.To)); + Writer.WriteDateTime(deletedRangeItem.To, isUtc: true); + + Writer.WriteEndObject(); + + await Writer.MaybeFlushAsync(); + } + } + public void RegisterForDisposal(IDisposable data) { throw new NotSupportedException($"{nameof(RegisterForDisposal)} is never used in {nameof(StreamTimeSeriesActions)}. Shouldn't happen."); diff --git a/src/Raven.Server/Smuggler/Documents/StreamSource.cs b/src/Raven.Server/Smuggler/Documents/StreamSource.cs index 18ea23fbdb09..dca53f960de6 100644 --- a/src/Raven.Server/Smuggler/Documents/StreamSource.cs +++ b/src/Raven.Server/Smuggler/Documents/StreamSource.cs @@ -28,7 +28,6 @@ using Raven.Server.Documents.Handlers; using Raven.Server.Documents.TimeSeries; using Raven.Server.Json; -using Raven.Server.Routing; using Raven.Server.ServerWide; using Raven.Server.ServerWide.Commands; using Raven.Server.Smuggler.Documents.Data; @@ -1042,6 +1041,8 @@ public async Task SkipTypeAsync(DatabaseItemType type, Action onSkip case DatabaseItemType.LegacyDocumentDeletions: case DatabaseItemType.LegacyAttachmentDeletions: case DatabaseItemType.CounterGroups: + case DatabaseItemType.TimeSeriesDeletedRanges: + case DatabaseItemType.ReplicationHubCertificates: return await SkipArrayAsync(onSkipped, null, token); case DatabaseItemType.TimeSeries: @@ -1050,9 +1051,6 @@ public async Task SkipTypeAsync(DatabaseItemType type, Action onSkip case DatabaseItemType.DatabaseRecord: return await SkipObjectAsync(onSkipped); - case DatabaseItemType.ReplicationHubCertificates: - return await SkipArrayAsync(onSkipped, null, token); - default: throw new ArgumentOutOfRangeException(nameof(type), type, null); } @@ -1063,6 +1061,41 @@ public SmugglerSourceType GetSourceType() return SmugglerSourceType.Import; } + public async IAsyncEnumerable GetTimeSeriesDeletedRangesAsync(ITimeSeriesActions action, List collectionsToExport) + { + var collectionsHashSet = new HashSet(collectionsToExport, StringComparer.OrdinalIgnoreCase); + + await foreach (var reader in ReadArrayAsync(action)) + { + if (reader.TryGet(nameof(TimeSeriesDeletedRangeItemForSmuggler.Collection), out LazyStringValue collection) == false || + reader.TryGet(nameof(TimeSeriesDeletedRangeItemForSmuggler.DocId), out LazyStringValue docId) == false || + reader.TryGet(nameof(TimeSeriesDeletedRangeItemForSmuggler.Name), out LazyStringValue name) == false || + reader.TryGet(nameof(TimeSeriesDeletedRangeItemForSmuggler.ChangeVector), out LazyStringValue cv) == false || + reader.TryGet(nameof(TimeSeriesDeletedRangeItemForSmuggler.From), out DateTime from) == false || + reader.TryGet(nameof(TimeSeriesDeletedRangeItemForSmuggler.To), out DateTime to) == false) + { + _result.TimeSeriesDeletedRanges.ErroredCount++; + _result.AddWarning("Could not read timeseries deleted range entry."); + continue; + } + + if (collectionsHashSet.Count > 0 && collectionsHashSet.Contains(collection) == false) + continue; + + action.RegisterForDisposal(reader); + + yield return new TimeSeriesDeletedRangeItemForSmuggler + { + DocId = docId, + Name = name, + Collection = collection, + ChangeVector = cv, + From = from, + To = to + }; + } + } + public IAsyncEnumerable GetDocumentsAsync(List collectionsToOperate, INewDocumentActions actions) { return ReadDocumentsAsync(collectionsToOperate, actions); @@ -2001,6 +2034,9 @@ private DatabaseItemType GetType(string type) if (type.Equals("AttachmentsDeletions", StringComparison.OrdinalIgnoreCase)) return DatabaseItemType.LegacyAttachmentDeletions; + if (type.Equals(nameof(DatabaseItemType.TimeSeriesDeletedRanges), StringComparison.OrdinalIgnoreCase)) + return DatabaseItemType.TimeSeriesDeletedRanges; + return DatabaseItemType.Unknown; } diff --git a/test/SlowTests/Server/Documents/PeriodicBackup/PeriodicBackupSlowTests.cs b/test/SlowTests/Server/Documents/PeriodicBackup/PeriodicBackupSlowTests.cs index 6d674b21cbe2..6fd3939e31eb 100644 --- a/test/SlowTests/Server/Documents/PeriodicBackup/PeriodicBackupSlowTests.cs +++ b/test/SlowTests/Server/Documents/PeriodicBackup/PeriodicBackupSlowTests.cs @@ -3975,6 +3975,244 @@ await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions } } + [RavenTheory(RavenTestCategory.Smuggler | RavenTestCategory.BackupExportImport | RavenTestCategory.TimeSeries)] + [RavenData(DatabaseMode = RavenDatabaseMode.All)] + public async Task can_backup_and_restore_with_deleted_timeseries_ranges(Options options) + { + var backupPath = NewDataPath(suffix: "BackupFolder"); + const string id = "users/1"; + + using (var store = GetDocumentStore(options)) + { + var baseline = DateTime.UtcNow; + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(new User { Name = "fitzchak" }, id); + + var tsf = session.TimeSeriesFor(id, "heartrate"); + for (int i = 0; i < 10; i++) + { + tsf.Append(baseline.AddHours(i), i); + } + + await session.SaveChangesAsync(); + } + + var config = Backup.CreateBackupConfiguration(backupPath); + var backupTaskId = await Backup.UpdateConfigAndRunBackupAsync(Server, config, store); + + using (var session = store.OpenAsyncSession()) + { + session.Delete(id); + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(new User { Name = "aviv" }, id); + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession()) + { + var ts = await session.TimeSeriesFor(id, "heartrate").GetAsync(); + Assert.Null(ts); + } + + await Backup.RunBackupAsync(Server, backupTaskId, store, isFullBackup: false); + + Assert.True(WaitForValue(() => + { + var dir = Directory.GetDirectories(backupPath).First(); + var files = Directory.GetFiles(dir); + return files.Length == 2; + }, expectedVal: true)); + } + + using (var store = GetDocumentStore(options)) + { + await store.Smuggler.ImportIncrementalAsync(new DatabaseSmugglerImportOptions(), + Directory.GetDirectories(backupPath).First()); + + using (var session = store.OpenAsyncSession()) + { + var user = await session.LoadAsync(id); + Assert.Equal("aviv", user.Name); + + var ts = await session.TimeSeriesFor(id, "heartrate").GetAsync(); + Assert.Null(ts); // fails, we get 10 entries + } + } + } + + [RavenTheory(RavenTestCategory.Smuggler | RavenTestCategory.BackupExportImport | RavenTestCategory.TimeSeries)] + [RavenData(DatabaseMode = RavenDatabaseMode.All)] + public async Task deleted_ranges_should_be_processed_before_timeseries(Options options) + { + var backupPath = NewDataPath(suffix: "BackupFolder"); + const string id = "users/1"; + + using (var store = GetDocumentStore(options)) + { + var baseline = DateTime.UtcNow; + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(new User { Name = "fitzchak" }, id); + + var tsf = session.TimeSeriesFor(id, "heartrate"); + for (int i = 0; i < 10; i++) + { + tsf.Append(baseline.AddHours(i), i); + } + + await session.SaveChangesAsync(); + } + + var config = Backup.CreateBackupConfiguration(backupPath); + var backupTaskId = await Backup.UpdateConfigAndRunBackupAsync(Server, config, store); + + using (var session = store.OpenAsyncSession()) + { + // delete the document to create a timeseries deleted range + session.Delete(id); + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession()) + { + // recreate the document and time series, and append a new entry to the series + // importing the deleted range should not delete this new entry + + await session.StoreAsync(new User { Name = "aviv" }, id); + session.TimeSeriesFor(id, "heartrate").Append(baseline.AddYears(1), 100); + + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession()) + { + var ts = await session.TimeSeriesFor(id, "heartrate").GetAsync(); + Assert.Equal(1, ts.Length); + } + + await Backup.RunBackupAsync(Server, backupTaskId, store, isFullBackup: false); + + Assert.True(WaitForValue(() => + { + var dir = Directory.GetDirectories(backupPath).First(); + var files = Directory.GetFiles(dir); + return files.Length == 2; + }, expectedVal: true)); + } + + using (var store = GetDocumentStore(options)) + { + await store.Smuggler.ImportIncrementalAsync(new DatabaseSmugglerImportOptions(), + Directory.GetDirectories(backupPath).First()); + + using (var session = store.OpenAsyncSession()) + { + var user = await session.LoadAsync(id); + Assert.Equal("aviv", user.Name); + + var ts = await session.TimeSeriesFor(id, "heartrate").GetAsync(); + Assert.Equal(1, ts.Length); + Assert.Equal(100, ts[0].Value); + } + } + } + + [RavenTheory(RavenTestCategory.Smuggler | RavenTestCategory.BackupExportImport | RavenTestCategory.TimeSeries)] + [RavenData(DatabaseMode = RavenDatabaseMode.All)] + public async Task deleted_ranges_should_be_processed_before_timeseries2(Options options) + { + var backupPath = NewDataPath(suffix: "BackupFolder"); + const string id = "users/1"; + + using (var store = GetDocumentStore(options)) + { + var baseline = DateTime.UtcNow; + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(new User { Name = "fitzchak" }, id); + + var tsf = session.TimeSeriesFor(id, "heartrate"); + for (int i = 0; i < 10; i++) + { + tsf.Append(baseline.AddHours(i), i); + } + + await session.SaveChangesAsync(); + } + + var config = Backup.CreateBackupConfiguration(backupPath); + var backupTaskId = await Backup.UpdateConfigAndRunBackupAsync(Server, config, store); + + using (var session = store.OpenAsyncSession()) + { + // delete the document to create a timeseries deleted range + session.Delete(id); + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession()) + { + // recreate the document and time series, and append a new entry to the series + + await session.StoreAsync(new User { Name = "aviv" }, id); + session.TimeSeriesFor(id, "heartrate").Append(baseline.AddYears(1), 100); + + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession()) + { + // delete the document once again to create another deleted range + // after importing this deleted range we should end up without timeseries + + session.Delete(id); + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession()) + { + // recreate the document so that we won't have a tombstone to backup + await session.StoreAsync(new User { Name = "egor" }, id); + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession()) + { + var ts = await session.TimeSeriesFor(id, "heartrate").GetAsync(); + Assert.Null(ts); + } + + await Backup.RunBackupAsync(Server, backupTaskId, store, isFullBackup: false); + + Assert.True(WaitForValue(() => + { + var dir = Directory.GetDirectories(backupPath).First(); + var files = Directory.GetFiles(dir); + return files.Length == 2; + }, expectedVal: true)); + } + + using (var store = GetDocumentStore(options)) + { + await store.Smuggler.ImportIncrementalAsync(new DatabaseSmugglerImportOptions(), + Directory.GetDirectories(backupPath).First()); + + using (var session = store.OpenAsyncSession()) + { + var user = await session.LoadAsync(id); + Assert.Equal("egor", user.Name); + + var ts = await session.TimeSeriesFor(id, "heartrate").GetAsync(); + Assert.Null(ts); + } + } + } + private static IDisposable ReadOnly(string path) { var files = Directory.GetFiles(path);