Skip to content

Commit

Permalink
RavenDB-20325 : handle export and import of timeseries deleted ranges…
Browse files Browse the repository at this point in the history
… in v6.0
  • Loading branch information
aviv committed Jun 19, 2024
1 parent bb183cb commit e676098
Show file tree
Hide file tree
Showing 18 changed files with 612 additions and 18 deletions.
5 changes: 2 additions & 3 deletions src/Raven.Client/Documents/Smuggler/DatabaseItemType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ public enum DatabaseItemType
LegacyAttachmentDeletions = 1 << 10,
DatabaseRecord = 1 << 11,
Unknown = 1 << 12,

Attachments = 1 << 14,
CounterGroups = 1 << 15,
Subscriptions = 1 << 16,
CompareExchangeTombstones = 1 << 17,
TimeSeries = 1 << 18,

ReplicationHubCertificates = 1 << 19
ReplicationHubCertificates = 1 << 19,
TimeSeriesDeletedRanges = 1 << 20
}

[Flags]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ public class DatabaseSmugglerOptions : IDatabaseSmugglerOptions
DatabaseItemType.Attachments |
DatabaseItemType.CounterGroups |
DatabaseItemType.Subscriptions |
DatabaseItemType.TimeSeries;
DatabaseItemType.TimeSeries |
DatabaseItemType.TimeSeriesDeletedRanges;

public const DatabaseRecordItemType DefaultOperateOnDatabaseRecordTypes = DatabaseRecordItemType.Client |
DatabaseRecordItemType.ConflictSolverConfig |
Expand Down
3 changes: 3 additions & 0 deletions src/Raven.Client/Documents/Smuggler/SmugglerProgressBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public abstract class SmugglerProgressBase : IOperationProgress

public Counts CompareExchangeTombstones { get; set; }

public CountsWithSkippedCountAndLastEtag TimeSeriesDeletedRanges { get; set; }

public virtual DynamicJsonValue ToJson()
{
return new DynamicJsonValue(GetType())
Expand All @@ -51,6 +53,7 @@ public virtual DynamicJsonValue ToJson()
[nameof(CompareExchangeTombstones)] = CompareExchangeTombstones.ToJson(),
[nameof(TimeSeries)] = TimeSeries.ToJson(),
[nameof(ReplicationHubCertificates)] = ReplicationHubCertificates.ToJson(),
[nameof(TimeSeriesDeletedRanges)] = TimeSeriesDeletedRanges.ToJson()
};
}

Expand Down
10 changes: 10 additions & 0 deletions src/Raven.Client/Documents/Smuggler/SmugglerResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public SmugglerResult()
Subscriptions = new Counts();
ReplicationHubCertificates = new Counts();
TimeSeries = new CountsWithSkippedCountAndLastEtag();
TimeSeriesDeletedRanges = new CountsWithSkippedCountAndLastEtag();
_progress = new SmugglerProgress(this);
}

Expand Down Expand Up @@ -86,6 +87,11 @@ void IOperationResult.MergeWith(IOperationResult result)

smugglerResult.TimeSeries.LastEtag = Math.Max(smugglerResult.TimeSeries.LastEtag, TimeSeries.LastEtag);

smugglerResult.TimeSeriesDeletedRanges.ReadCount += TimeSeriesDeletedRanges.ReadCount;
smugglerResult.TimeSeriesDeletedRanges.ErroredCount += TimeSeriesDeletedRanges.ErroredCount;
smugglerResult.TimeSeriesDeletedRanges.SizeInBytes += TimeSeriesDeletedRanges.SizeInBytes;
smugglerResult.TimeSeriesDeletedRanges.LastEtag = Math.Max(smugglerResult.TimeSeriesDeletedRanges.LastEtag, TimeSeriesDeletedRanges.LastEtag);

smugglerResult.Identities.ReadCount += Identities.ReadCount;
smugglerResult.Identities.ErroredCount += Identities.ErroredCount;

Expand Down Expand Up @@ -230,6 +236,7 @@ public SmugglerProgress(SmugglerResult result)
Subscriptions = _result?.Subscriptions;
TimeSeries = _result?.TimeSeries;
ReplicationHubCertificates = _result?.ReplicationHubCertificates;
TimeSeriesDeletedRanges = _result?.TimeSeriesDeletedRanges;
}

internal string Message { get; set; }
Expand Down Expand Up @@ -261,6 +268,9 @@ public long GetLastEtag()
if (TimeSeries.LastEtag > lastEtag)
lastEtag = TimeSeries.LastEtag;

if (TimeSeriesDeletedRanges.LastEtag > lastEtag)
lastEtag = TimeSeriesDeletedRanges.LastEtag;

return lastEtag;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ internal void ApplyBackwardCompatibility(DatabaseSmugglerOptionsServerSide optio
if (RequestRouter.TryGetClientVersion(HttpContext, out var version) == false)
return;

if (version.Major == 5 && (version.Minor < 4 || version.Minor == 4 && version.Build < 200) &&
options.OperateOnTypes.HasFlag(DatabaseItemType.TimeSeries))
{
// version is older than 5.4.200
options.OperateOnTypes |= DatabaseItemType.TimeSeriesDeletedRanges;
}

if (version.Major != RavenVersionAttribute.Instance.MajorVersion)
return;

Expand Down
41 changes: 40 additions & 1 deletion src/Raven.Server/Documents/Handlers/TimeSeriesHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ public sealed class SmugglerTimeSeriesBatchCommand : MergedTransactionCommand<Do

private readonly Dictionary<string, List<TimeSeriesItem>> _dictionary;

private readonly Dictionary<string, List<TimeSeriesDeletedRangeItemForSmuggler>> _deletedRanges;

private readonly DocumentsOperationContext _context;

public DocumentsOperationContext Context => _context;
Expand All @@ -292,7 +294,8 @@ public sealed class SmugglerTimeSeriesBatchCommand : MergedTransactionCommand<Do
public SmugglerTimeSeriesBatchCommand(DocumentDatabase database)
{
_database = database;
_dictionary = new Dictionary<string, List<TimeSeriesItem>>();
_dictionary = new Dictionary<string, List<TimeSeriesItem>>(StringComparer.OrdinalIgnoreCase);
_deletedRanges = new Dictionary<string, List<TimeSeriesDeletedRangeItemForSmuggler>>(StringComparer.OrdinalIgnoreCase);
_toDispose = new();
_toReturn = new();
_releaseContext = _database.DocumentsStorage.ContextPool.AllocateOperationContext(out _context);
Expand All @@ -304,6 +307,27 @@ protected override long ExecuteCmd(DocumentsOperationContext context)

var changes = 0L;

foreach (var (docId, items) in _deletedRanges)
{
foreach (var item in items)
{
using (item)
{
var deletionRangeRequest = new TimeSeriesStorage.DeletionRangeRequest
{
DocumentId = docId,
Collection = item.Collection,
Name = item.Name,
From = item.From,
To = item.To
};
tss.DeleteTimestampRange(context, deletionRangeRequest, remoteChangeVector: null, updateMetadata: false);
}
}

changes += items.Count;
}

foreach (var (docId, items) in _dictionary)
{
var collectionName = _database.DocumentsStorage.ExtractCollectionName(context, items[0].Collection);
Expand Down Expand Up @@ -346,6 +370,21 @@ public bool AddToDictionary(TimeSeriesItem item)
return newItem;
}

public bool AddToDeletedRanges(TimeSeriesDeletedRangeItemForSmuggler item)
{
bool newItem = false;

if (_deletedRanges.TryGetValue(item.DocId, out var deletedRangesList) == false)
{
_deletedRanges[item.DocId] = deletedRangesList = [];
newItem = true;
}

deletedRangesList.Add(item);
return newItem;
}


public void AddToDisposal(IDisposable disposable)
{
_toDispose.Add(disposable);
Expand Down
5 changes: 5 additions & 0 deletions src/Raven.Server/Smuggler/Documents/CsvStreamSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,11 @@ public Stream GetAttachmentStream(LazyStringValue hash, out string tag)
throw new NotSupportedException();
}

public IAsyncEnumerable<TimeSeriesDeletedRangeItemForSmuggler> GetTimeSeriesDeletedRangesAsync(ITimeSeriesActions action, List<string> collectionsToExport)
{
return AsyncEnumerable.Empty<TimeSeriesDeletedRangeItemForSmuggler>();
}

public void Dispose()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public interface ISmugglerDestination

ITimeSeriesActions TimeSeries();

ITimeSeriesActions TimeSeriesDeletedRanges();

ILegacyActions LegacyDocumentDeletions();

ILegacyActions LegacyAttachmentDeletions();
Expand Down Expand Up @@ -126,7 +128,9 @@ public interface IDatabaseRecordActions : IAsyncDisposable
public interface ITimeSeriesActions : IAsyncDisposable, INewItemActions
{
ValueTask WriteTimeSeriesAsync(TimeSeriesItem ts);


ValueTask WriteTimeSeriesDeletedRangeAsync(TimeSeriesDeletedRangeItemForSmuggler deletedRange);

void RegisterForDisposal(IDisposable data);

void RegisterForReturnToTheContext(AllocatedMemoryData data);
Expand Down
2 changes: 2 additions & 0 deletions src/Raven.Server/Smuggler/Documents/Data/ISmugglerSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public interface ISmugglerSource
SmugglerSourceType GetSourceType();

Stream GetAttachmentStream(LazyStringValue hash, out string tag);

IAsyncEnumerable<TimeSeriesDeletedRangeItemForSmuggler> GetTimeSeriesDeletedRangesAsync(ITimeSeriesActions action, List<string> collectionsToExport);
}

public enum SmugglerSourceType
Expand Down
38 changes: 31 additions & 7 deletions src/Raven.Server/Smuggler/Documents/DatabaseDestination.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ public ITimeSeriesActions TimeSeries()
return new TimeSeriesActions(_database);
}

public ITimeSeriesActions TimeSeriesDeletedRanges()
{
return new TimeSeriesActions(_database);
}

public ILegacyActions LegacyDocumentDeletions()
{
// Used only in Stream Destination, needed when we writing from Stream Source to Stream Destination
Expand Down Expand Up @@ -1299,7 +1304,7 @@ private sealed class TimeSeriesActions : ITimeSeriesActions
private TimeSeriesHandler.SmugglerTimeSeriesBatchCommand _cmd;
private TimeSeriesHandler.SmugglerTimeSeriesBatchCommand _prevCommand;
private Task _prevCommandTask = Task.CompletedTask;
private Size _segmentsSize;
private Size _batchSize;
private readonly Size _maxBatchSize;

public TimeSeriesActions(DocumentDatabase database)
Expand All @@ -1313,7 +1318,7 @@ public TimeSeriesActions(DocumentDatabase database)
: 16,
SizeUnit.Megabytes);

_segmentsSize = new Size();
_batchSize = new Size();
}

private void AddToBatch(TimeSeriesItem item)
Expand All @@ -1324,9 +1329,22 @@ private void AddToBatch(TimeSeriesItem item)
// be accounted for that if we look at segment size alone. So we assume that any new item means
// updating the whole segment. This is especially important for encrypted databases, where we need
// to keep all the modified data in memory in one shot
_segmentsSize.Add(2, SizeUnit.Kilobytes);
_batchSize.Add(2, SizeUnit.Kilobytes);
}
_segmentsSize.Add(item.Segment.NumberOfBytes, SizeUnit.Bytes);
_batchSize.Add(item.Segment.NumberOfBytes, SizeUnit.Bytes);
}

private void AddToBatch(TimeSeriesDeletedRangeItemForSmuggler item)
{
_cmd.AddToDeletedRanges(item);

var size = item.Name.Size +
item.DocId.Size +
item.Collection.Size +
item.ChangeVector.Size
+ 3 * sizeof(long); // From, To, Etag

_batchSize.Add(size, SizeUnit.Bytes);
}

public async ValueTask DisposeAsync()
Expand All @@ -1340,6 +1358,12 @@ public async ValueTask WriteTimeSeriesAsync(TimeSeriesItem ts)
await HandleBatchOfTimeSeriesIfNecessaryAsync();
}

public async ValueTask WriteTimeSeriesDeletedRangeAsync(TimeSeriesDeletedRangeItemForSmuggler deletedRange)
{
AddToBatch(deletedRange);
await HandleBatchOfTimeSeriesIfNecessaryAsync();
}

public void RegisterForDisposal(IDisposable data)
{
_cmd.AddToDisposal(data);
Expand All @@ -1352,7 +1376,7 @@ public void RegisterForReturnToTheContext(AllocatedMemoryData data)

private async ValueTask HandleBatchOfTimeSeriesIfNecessaryAsync()
{
if (_segmentsSize < _maxBatchSize)
if (_batchSize < _maxBatchSize)
return;

var prevCommand = _prevCommand;
Expand All @@ -1373,7 +1397,7 @@ private async ValueTask HandleBatchOfTimeSeriesIfNecessaryAsync()

_cmd = new TimeSeriesHandler.SmugglerTimeSeriesBatchCommand(_database);

_segmentsSize.Set(0, SizeUnit.Bytes);
_batchSize.Set(0, SizeUnit.Bytes);
}

private async ValueTask FinishBatchOfTimeSeriesAsync()
Expand All @@ -1388,7 +1412,7 @@ private async ValueTask FinishBatchOfTimeSeriesAsync()
_prevCommand = null;
}

if (_segmentsSize.GetValue(SizeUnit.Bytes) > 0)
if (_batchSize.GetValue(SizeUnit.Bytes) > 0)
{
await _database.TxMerger.Enqueue(_cmd);
}
Expand Down
83 changes: 83 additions & 0 deletions src/Raven.Server/Smuggler/Documents/DatabaseSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class DatabaseSource : ISmugglerSource
DatabaseItemType.CompareExchangeTombstones,
DatabaseItemType.CounterGroups,
DatabaseItemType.Subscriptions,
DatabaseItemType.TimeSeriesDeletedRanges,
DatabaseItemType.TimeSeries,
DatabaseItemType.ReplicationHubCertificates,
DatabaseItemType.None
Expand Down Expand Up @@ -601,5 +602,87 @@ public SmugglerSourceType GetSourceType()
{
return _type;
}

public IAsyncEnumerable<TimeSeriesDeletedRangeItemForSmuggler> GetTimeSeriesDeletedRangesAsync(ITimeSeriesActions action, List<string> collectionsToExport) =>
GetTimeSeriesDeletedRanges(collectionsToExport).ToAsyncEnumerable();

private IEnumerable<TimeSeriesDeletedRangeItemForSmuggler> GetTimeSeriesDeletedRanges(IEnumerable<string> collectionsToExport)
{
Debug.Assert(_context != null);

var initialState = new TimeSeriesDeletedRangeIterationState(_context, _database.Configuration.Databases.PulseReadTransactionLimit)
{
StartEtag = _startDocumentEtag,
StartEtagByCollection = collectionsToExport.ToDictionary(x => x, x => _startDocumentEtag)
};

var enumerator = new PulsedTransactionEnumerator<TimeSeriesDeletedRangeItemForSmuggler, TimeSeriesDeletedRangeIterationState>(_context,
state =>
{
if (state.StartEtagByCollection.Count != 0)
return GetTimeSeriesDeletedRangesFromCollections(_context, state);
return GetAlTimeSeriesDeletedRanges(_context, state.StartEtag);
}, initialState);

while (enumerator.MoveNext())
{
yield return enumerator.Current;
}
}

private static IEnumerable<TimeSeriesDeletedRangeItemForSmuggler> GetAlTimeSeriesDeletedRanges(DocumentsOperationContext context, long startEtag)
{
var database = context.DocumentDatabase;
foreach (var deletedRange in database.DocumentsStorage.TimeSeriesStorage.GetDeletedRangesFrom(context, startEtag))
{
using (deletedRange)
{
TimeSeriesValuesSegment.ParseTimeSeriesKey(deletedRange.Key, context, out var docId, out var name);

yield return new TimeSeriesDeletedRangeItemForSmuggler
{
DocId = docId,
Name = name,
From = deletedRange.From,
To = deletedRange.To,
Collection = deletedRange.Collection.Clone(context),
ChangeVector = context.GetLazyString(deletedRange.ChangeVector),
Etag = deletedRange.Etag
};
}
}
}

private static IEnumerable<TimeSeriesDeletedRangeItemForSmuggler> GetTimeSeriesDeletedRangesFromCollections(DocumentsOperationContext context, TimeSeriesDeletedRangeIterationState state)
{
var database = context.DocumentDatabase;
var collections = state.StartEtagByCollection.Keys.ToList();

foreach (var collection in collections)
{
var etag = state.StartEtagByCollection[collection];

state.CurrentCollection = collection;

foreach (var deletedRange in database.DocumentsStorage.TimeSeriesStorage.GetDeletedRangesFrom(context, collection, etag))
{
using (deletedRange)
{
TimeSeriesValuesSegment.ParseTimeSeriesKey(deletedRange.Key, context, out var docId, out var name);

yield return new TimeSeriesDeletedRangeItemForSmuggler
{
DocId = docId,
Name = name,
From = deletedRange.From,
To = deletedRange.To,
Collection = deletedRange.Collection.Clone(context),
ChangeVector = context.GetLazyString(deletedRange.ChangeVector),
Etag = deletedRange.Etag
};
}
}
}
}
}
}
Loading

0 comments on commit e676098

Please sign in to comment.