diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5429c4b..b0e87fe 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,13 @@
## Changelog
+### 0.9.90 - 15/10/2024
+* Reworked serialization of Tuple attributes. The result more efficient (in space and time) but limited in the number and types of elements that can be stored in a Tuple.
+* Reworked the internals of the DatomStore to support more types of specialized transactions such as Excision
+* Implemented a basic form of schema migration.
+ * Indexes can be added and removed
+ * Attributes can be added and removed (removed attributes are ignored)
+ * Attribute types can be changed (in a very limited fashion, to be expanded in the future)
+
### 0.9.89 - 09/10/2024
* Fixed a bug with case-insensitive string comparisons in the database. This would cause an "Invalid UTF-8" exception to be thrown
diff --git a/src/NexusMods.MnemonicDB.Abstractions/AttributeCache.cs b/src/NexusMods.MnemonicDB.Abstractions/AttributeCache.cs
index 9d68298..3d58208 100644
--- a/src/NexusMods.MnemonicDB.Abstractions/AttributeCache.cs
+++ b/src/NexusMods.MnemonicDB.Abstractions/AttributeCache.cs
@@ -19,6 +19,7 @@ public sealed class AttributeCache
private BitArray _isReference;
private BitArray _isIndexed;
private Symbol[] _symbols;
+ private ValueTag[] _valueTags;
private BitArray _isNoHistory;
public AttributeCache()
@@ -29,12 +30,14 @@ public AttributeCache()
_isIndexed = new BitArray(maxId);
_isNoHistory = new BitArray(maxId);
_symbols = new Symbol[maxId];
+ _valueTags = new ValueTag[maxId];
foreach (var kv in AttributeDefinition.HardcodedIds)
{
_attributeIdsBySymbol[kv.Key.Id] = AttributeId.From(kv.Value);
_isIndexed[kv.Value] = kv.Key.IsIndexed;
_symbols[kv.Value] = kv.Key.Id;
+ _valueTags[kv.Value] = kv.Key.LowLevelType;
}
}
@@ -104,7 +107,16 @@ public void Reset(IDb db)
newIsCardinalityMany[(int)id] = AttributeDefinition.Cardinality.ReadValue(datom.ValueSpan, datom.Prefix.ValueTag, null!) == Cardinality.Many;
}
_isCardinalityMany = newIsCardinalityMany;
-
+
+ var valueTags = db.Datoms(AttributeDefinition.ValueType);
+ var newValueTags = new ValueTag[maxIndex];
+ foreach (var datom in valueTags)
+ {
+ var id = datom.E.Value;
+ var type = AttributeDefinition.ValueType.ReadValue(datom.ValueSpan, datom.Prefix.ValueTag, null!);
+ newValueTags[id] = type;
+ }
+ _valueTags = newValueTags;
}
///
@@ -155,4 +167,28 @@ public Symbol GetSymbol(AttributeId id)
{
return _symbols[id.Value];
}
+
+ ///
+ /// Returns true if the attribute is defined in the database.
+ ///
+ public bool Has(Symbol attribute)
+ {
+ return _attributeIdsBySymbol.ContainsKey(attribute);
+ }
+
+ ///
+ /// Try to get the AttributeId for the given attribute name
+ ///
+ public bool TryGetAttributeId(Symbol attribute, out AttributeId id)
+ {
+ return _attributeIdsBySymbol.TryGetValue(attribute, out id);
+ }
+
+ ///
+ /// Return the value tag type for the given attribute id
+ ///
+ public ValueTag GetValueTag(AttributeId aid)
+ {
+ return _valueTags[aid.Value];
+ }
}
diff --git a/src/NexusMods.MnemonicDB.Abstractions/BuiltInEntities/AttributeDefinition.cs b/src/NexusMods.MnemonicDB.Abstractions/BuiltInEntities/AttributeDefinition.cs
index 2b00a43..0619f6d 100644
--- a/src/NexusMods.MnemonicDB.Abstractions/BuiltInEntities/AttributeDefinition.cs
+++ b/src/NexusMods.MnemonicDB.Abstractions/BuiltInEntities/AttributeDefinition.cs
@@ -84,13 +84,7 @@ public static void Insert(ITransaction tx, IAttribute attribute, ushort id = 0)
{ Documentation, 7 },
{ Transaction.Timestamp, 8}
};
-
- ///
- /// All hardcoded attributes as a DbAttribute enumerable
- ///
- public static IEnumerable HardcodedDbAttributes => HardcodedIds.Select(a =>
- new DbAttribute(a.Key.Id, AttributeId.From(a.Value), a.Key.LowLevelType, a.Key));
-
+
///
/// Adds the initial set of attributes to the transaction, these will be created when the transaction is committed.
///
diff --git a/src/NexusMods.MnemonicDB.Abstractions/Datom.cs b/src/NexusMods.MnemonicDB.Abstractions/Datom.cs
index a27f11a..0dea749 100644
--- a/src/NexusMods.MnemonicDB.Abstractions/Datom.cs
+++ b/src/NexusMods.MnemonicDB.Abstractions/Datom.cs
@@ -95,7 +95,7 @@ public Datom Clone()
///
public override string ToString()
{
- return $"[E: {E}, A: {A}, T: {T}, IsRetract: {IsRetract}, Value: {ValueSpan.Length}]";
+ return $"[E: {E}, A: {A}, T: {T}, IsRetract: {IsRetract}, Value: {Prefix.ValueTag.Read(ValueSpan)}]";
}
///
diff --git a/src/NexusMods.MnemonicDB.Abstractions/DbAttribute.cs b/src/NexusMods.MnemonicDB.Abstractions/DbAttribute.cs
deleted file mode 100644
index fde1839..0000000
--- a/src/NexusMods.MnemonicDB.Abstractions/DbAttribute.cs
+++ /dev/null
@@ -1,12 +0,0 @@
-using NexusMods.MnemonicDB.Abstractions.ElementComparers;
-
-namespace NexusMods.MnemonicDB.Abstractions;
-
-///
-/// A record of information that maps a in-code version of an attribute (the symbol name) to the
-/// database attribute entity Id, and the ValueType id
-///
-///
-///
-///
-public record DbAttribute(Symbol UniqueId, AttributeId AttrEntityId, ValueTag LowLevelType, IAttribute Attribute);
diff --git a/src/NexusMods.MnemonicDB.Abstractions/IConnection.cs b/src/NexusMods.MnemonicDB.Abstractions/IConnection.cs
index 0f42d0a..1c56fc0 100644
--- a/src/NexusMods.MnemonicDB.Abstractions/IConnection.cs
+++ b/src/NexusMods.MnemonicDB.Abstractions/IConnection.cs
@@ -6,22 +6,6 @@
namespace NexusMods.MnemonicDB.Abstractions;
-///
-/// A database revision, which includes a datom and the datoms added to it.
-///
-public struct Revision
-{
- ///
- /// The database for the most recent transaction
- ///
- public IDb Database;
-
- ///
- /// The datoms that were added in the most recent transaction
- ///
- public IndexSegment AddedDatoms;
-}
-
///
/// Represents a connection to a database.
///
@@ -87,5 +71,10 @@ public interface IConnection
/// Deletes the entities with the given ids, also deleting them from any historical indexes. Returns the total number
/// of datoms that were excised.
///
- public Task Excise(EntityId[] entityIds);
+ public Task Excise(EntityId[] entityIds);
+
+ ///
+ /// Update the database's schema with the given attributes.
+ ///
+ public Task UpdateSchema(params IAttribute[] attribute);
}
diff --git a/src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs b/src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs
index 5d54c11..07a208d 100644
--- a/src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs
+++ b/src/NexusMods.MnemonicDB.Abstractions/IDatomStore.cs
@@ -1,10 +1,7 @@
using System;
-using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
-using NexusMods.MnemonicDB.Abstractions.DatomIterators;
using NexusMods.MnemonicDB.Abstractions.IndexSegments;
-using NexusMods.MnemonicDB.Abstractions.TxFunctions;
namespace NexusMods.MnemonicDB.Abstractions;
@@ -39,31 +36,20 @@ public interface IDatomStore : IDisposable
/// Any existing data will be deleted before importing.
///
public Task ImportAsync(Stream stream);
-
- ///
- /// Transacts (adds) the given datoms into the store.
- ///
- public Task<(StoreResult, IDb)> TransactAsync(IndexSegment datoms, HashSet? txFunctions = null);
-
-
+
///
- /// Transacts (adds) the given datoms into the store.
+ /// Transacts (adds) the given datoms into the store.
///
- public (StoreResult, IDb) Transact(IndexSegment datoms, HashSet? txFunctions = null);
-
+ public (StoreResult, IDb) Transact(IndexSegment segment);
+
+
///
- /// Registers new attributes with the store.
+ /// Transacts (adds) the given datoms into the store.
///
- ///
- void RegisterAttributes(IEnumerable newAttrs);
-
+ public Task<(StoreResult, IDb)> TransactAsync(IndexSegment segment);
+
///
/// Create a snapshot of the current state of the store.
///
ISnapshot GetSnapshot();
-
- ///
- /// Deletes these datoms from any of the indexes they are in.
- ///
- ValueTask Excise(List datomsToRemove);
}
diff --git a/src/NexusMods.MnemonicDB.Abstractions/IInternalTxFunction.cs b/src/NexusMods.MnemonicDB.Abstractions/IInternalTxFunction.cs
new file mode 100644
index 0000000..fa2856a
--- /dev/null
+++ b/src/NexusMods.MnemonicDB.Abstractions/IInternalTxFunction.cs
@@ -0,0 +1,7 @@
+namespace NexusMods.MnemonicDB.Abstractions;
+
+///
+/// A marker attribute for functions that are internal to the MnemonicDB, used mostly for schema updates
+/// and other bulk operations that are started via the TX Queue but are exposed to users through non transaction APIs
+///
+public interface IInternalTxFunction;
diff --git a/src/NexusMods.MnemonicDB.Abstractions/ITransaction.cs b/src/NexusMods.MnemonicDB.Abstractions/ITransaction.cs
index 99e4b2f..1288c6d 100644
--- a/src/NexusMods.MnemonicDB.Abstractions/ITransaction.cs
+++ b/src/NexusMods.MnemonicDB.Abstractions/ITransaction.cs
@@ -37,8 +37,7 @@ public interface ITransaction : IDisposable
/// Adds a new datom to the transaction
///
void Add(EntityId entityId, Attribute attribute, TVal val, bool isRetract = false);
-
-
+
///
/// Adds datoms for adding the given ids to the transaction under the given attribute
///
diff --git a/src/NexusMods.MnemonicDB.Abstractions/Internals/KeyPrefix.cs b/src/NexusMods.MnemonicDB.Abstractions/Internals/KeyPrefix.cs
index 7915700..fb1feac 100644
--- a/src/NexusMods.MnemonicDB.Abstractions/Internals/KeyPrefix.cs
+++ b/src/NexusMods.MnemonicDB.Abstractions/Internals/KeyPrefix.cs
@@ -95,7 +95,7 @@ public bool IsRetract
public ValueTag ValueTag
{
get => (ValueTag)((_lower >> 1) & 0x7F);
- init => _lower = (_lower & 0xFF00000000000001) | ((ulong)value << 1);
+ init => _lower = (_lower & 0xFFFFFFFFFFFFFF01) | ((ulong)value << 1);
}
///
diff --git a/src/NexusMods.MnemonicDB.Abstractions/Query/SliceDescriptor.cs b/src/NexusMods.MnemonicDB.Abstractions/Query/SliceDescriptor.cs
index 9f22c9b..31ec975 100644
--- a/src/NexusMods.MnemonicDB.Abstractions/Query/SliceDescriptor.cs
+++ b/src/NexusMods.MnemonicDB.Abstractions/Query/SliceDescriptor.cs
@@ -110,6 +110,10 @@ public static SliceDescriptor Create(TxId tx)
///
public static SliceDescriptor Create(Attribute attr, THighLevel value, AttributeCache attributeCache)
{
+ var id = attributeCache.GetAttributeId(attr.Id);
+ if (attributeCache.GetValueTag(id) != ValueTag.Reference && !attributeCache.IsIndexed(id))
+ throw new InvalidOperationException($"Attribute {attr.Id} must be indexed or a reference");
+
return new SliceDescriptor
{
Index = attr.IsReference ? IndexType.VAETCurrent : IndexType.AVETCurrent,
@@ -162,11 +166,11 @@ public static SliceDescriptor Create(AttributeId referenceAttribute, EntityId po
/// Creates a slice descriptor for the given attribute from the current AEVT index
/// reverse lookup.
///
- public static SliceDescriptor Create(AttributeId referenceAttribute)
+ public static SliceDescriptor Create(AttributeId referenceAttribute, IndexType indexType = IndexType.AEVTCurrent)
{
return new SliceDescriptor
{
- Index = IndexType.AEVTCurrent,
+ Index = indexType,
From = Datom(EntityId.MinValueNoPartition, referenceAttribute, TxId.MinValue, false),
To = Datom(EntityId.MaxValueNoPartition, referenceAttribute, TxId.MaxValue, false)
};
diff --git a/src/NexusMods.MnemonicDB.Abstractions/Serializer.cs b/src/NexusMods.MnemonicDB.Abstractions/Serializer.cs
index 81840a0..ce331bb 100644
--- a/src/NexusMods.MnemonicDB.Abstractions/Serializer.cs
+++ b/src/NexusMods.MnemonicDB.Abstractions/Serializer.cs
@@ -1,5 +1,6 @@
using System;
using System.Buffers;
+using System.Collections.Generic;
using System.IO.Hashing;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
@@ -189,8 +190,8 @@ private static void WriteAscii(string value, TWriter writer)
{
var length = (uint)value.Length;
var span = writer.GetSpan(sizeof(uint) + value.Length);
- MemoryMarshal.Write(span, ref length);
- ASCII.GetBytes(value, span.Slice(sizeof(uint)));
+ MemoryMarshal.Write(span, length);
+ ASCII.GetBytes(value, span.SliceFast(sizeof(uint)));
writer.Advance(sizeof(uint) + value.Length);
}
@@ -199,8 +200,8 @@ private static void WriteUtf8(string value, TWriter writer)
{
var length = (uint)value.Length;
var span = writer.GetSpan(sizeof(uint) + value.Length);
- MemoryMarshal.Write(span, ref length);
- UTF8.GetBytes(value, span.Slice(sizeof(uint)));
+ MemoryMarshal.Write(span, length);
+ UTF8.GetBytes(value, span.SliceFast(sizeof(uint)));
writer.Advance(sizeof(uint) + value.Length);
}
@@ -209,8 +210,8 @@ private static void WriteBlob(Memory value, TWriter writer)
{
var length = (uint)value.Length;
var span = writer.GetSpan(sizeof(uint) + value.Length);
- MemoryMarshal.Write(span, ref length);
- value.Span.CopyTo(span.Slice(sizeof(uint)));
+ MemoryMarshal.Write(span, length);
+ value.Span.CopyTo(span.SliceFast(sizeof(uint)));
writer.Advance(sizeof(uint) + value.Length);
}
@@ -396,5 +397,37 @@ public static void Remap(this ValueTag tag, Span span, Func(this ValueTag srcTag, ReadOnlySpan srcSpan, ValueTag destTag, TWriter destWriter)
+ where TWriter : IBufferWriter
+ {
+
+ try
+ {
+ switch (srcTag, destTag)
+ {
+ case (ValueTag.UInt8, ValueTag.UInt16):
+ WriteUnmanaged((ushort)MemoryMarshal.Read(srcSpan), destWriter);
+ break;
+ case (ValueTag.Utf8, ValueTag.UInt64):
+ {
+ var val = srcTag.Read(srcSpan);
+ WriteUnmanaged(Convert.ToUInt64(val), destWriter);
+ break;
+ }
+
+ default:
+ throw new NotSupportedException("Conversion not supported from " + srcTag + " to " + destTag);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new InvalidOperationException($"Failed to convert ({srcTag.Read(srcSpan)}) value from " + srcTag + " to " + destTag, e);
+ }
+ }
+
+ #endregion
+
}
diff --git a/src/NexusMods.MnemonicDB/Connection.cs b/src/NexusMods.MnemonicDB/Connection.cs
index 40293f1..c305e36 100644
--- a/src/NexusMods.MnemonicDB/Connection.cs
+++ b/src/NexusMods.MnemonicDB/Connection.cs
@@ -7,12 +7,10 @@
using Microsoft.Extensions.Logging;
using NexusMods.MnemonicDB.Abstractions;
using NexusMods.MnemonicDB.Abstractions.BuiltInEntities;
-using NexusMods.MnemonicDB.Abstractions.DatomIterators;
using NexusMods.MnemonicDB.Abstractions.ElementComparers;
using NexusMods.MnemonicDB.Abstractions.IndexSegments;
-using NexusMods.MnemonicDB.Abstractions.Query;
-using NexusMods.MnemonicDB.Abstractions.TxFunctions;
-using ObservableExtensions = R3.ObservableExtensions;
+using NexusMods.MnemonicDB.InternalTxFunctions;
+using NexusMods.MnemonicDB.Storage;
namespace NexusMods.MnemonicDB;
@@ -21,7 +19,7 @@ namespace NexusMods.MnemonicDB;
///
public class Connection : IConnection
{
- private readonly IDatomStore _store;
+ private readonly DatomStore _store;
private readonly ILogger _logger;
private DbStream _dbStream;
@@ -37,7 +35,7 @@ public Connection(ILogger logger, IDatomStore store, IServiceProvide
AttributeCache = store.AttributeCache;
AttributeResolver = new AttributeResolver(provider, AttributeCache);
_logger = logger;
- _store = store;
+ _store = (DatomStore)store;
_dbStream = new DbStream();
_analyzers = analyzers.ToArray();
Bootstrap();
@@ -137,94 +135,28 @@ public ITransaction BeginTransaction()
///
- public async Task Excise(EntityId[] entityIds)
+ public async Task Excise(EntityId[] entityIds)
{
- // Retract all datoms for the given entity ids
- var contextDb = Db;
-
- List datomsToRemove = new();
- foreach (var entityId in entityIds)
- {
- var segment = contextDb.Datoms(entityId);
- datomsToRemove.AddRange(segment);
- }
-
- {
- using var tx = BeginTransaction();
- foreach (var datom in datomsToRemove)
- {
- tx.Add(datom.Retract());
- }
- var results = await tx.Commit();
- contextDb = results.Db;
- }
-
- // Now delete all the datoms from all indexes
- datomsToRemove.Clear();
-
- foreach (var entityId in entityIds)
- {
- var segment = contextDb.Datoms(SliceDescriptor.Create(IndexType.EAVTHistory, entityId));
- datomsToRemove.AddRange(segment);
- }
-
- await _store.Excise(datomsToRemove);
-
- {
- using var tx = BeginTransaction();
- tx.Add((EntityId)tx.ThisTxId.Value, Abstractions.BuiltInEntities.Transaction.ExcisedDatoms, (ulong)datomsToRemove.Count);
- await tx.Commit();
- }
-
-
- return (ulong)datomsToRemove.Count;
+ var tx = new Transaction(this);
+ tx.Set(new Excise(entityIds));
+ return await tx.Commit();
}
///
- public IObservable Revisions => _dbStream;
-
- private void AddMissingAttributes()
+ public Task UpdateSchema(params IAttribute[] attribute)
{
- var declaredAttributes = AttributeResolver.DefinedAttributes;
- var existing = AttributeCache.AllAttributeIds.ToHashSet();
-
- if (existing.Count == 0)
- throw new AggregateException(
- "No attributes found in the database, something went wrong, as it should have been bootstrapped by now");
-
- var missing = declaredAttributes.Where(a => !existing.Contains(a.Id)).ToArray();
- if (missing.Length == 0)
- {
- // No changes to make to the schema, we can return early
- return;
- }
-
- var attrId = existing.Select(sym => AttributeCache.GetAttributeId(sym)).Max().Value;
- using var builder = new IndexSegmentBuilder(AttributeCache);
- foreach (var attr in missing.OrderBy(e => e.Id.Id))
- {
- var id = EntityId.From(++attrId);
- builder.Add(id, AttributeDefinition.UniqueId, attr.Id);
- builder.Add(id, AttributeDefinition.ValueType, attr.LowLevelType);
- if (attr.IsIndexed)
- builder.Add(id, AttributeDefinition.Indexed, Null.Instance);
- builder.Add(id, AttributeDefinition.Cardinality, attr.Cardinalty);
- if (attr.NoHistory)
- builder.Add(id, AttributeDefinition.NoHistory, Null.Instance);
- if (attr.DeclaredOptional)
- builder.Add(id, AttributeDefinition.Optional, Null.Instance);
- }
-
- var (_, db) = _store.Transact(builder.Build());
- AttributeCache.Reset(db);
+ return Transact(new SchemaMigration(attribute));
}
- internal async Task Transact(IndexSegment datoms, HashSet? txFunctions)
+ ///
+ public IObservable Revisions => _dbStream;
+
+ internal async Task Transact(IInternalTxFunction fn)
{
StoreResult newTx;
IDb newDb;
- (newTx, newDb) = await _store.TransactAsync(datoms, txFunctions);
+ (newTx, newDb) = await _store.TransactAsync(fn);
((Db)newDb).Connection = this;
var result = new CommitResult(newDb, newTx.Remaps);
return result;
@@ -241,8 +173,9 @@ private void Bootstrap()
};
AttributeCache.Reset(initialDb);
- AddMissingAttributes();
-
+ var declaredAttributes = AttributeResolver.DefinedAttributes.OrderBy(a => a.Id.Id).ToArray();
+ _store.Transact(new SchemaMigration(declaredAttributes));
+
_dbStreamDisposable = ProcessUpdates(_store.TxLog)
.Subscribe(itm => _dbStream.OnNext(itm));
}
diff --git a/src/NexusMods.MnemonicDB/IInternalTxFunctionImpl.cs b/src/NexusMods.MnemonicDB/IInternalTxFunctionImpl.cs
new file mode 100644
index 0000000..78b7b1a
--- /dev/null
+++ b/src/NexusMods.MnemonicDB/IInternalTxFunctionImpl.cs
@@ -0,0 +1,38 @@
+using System;
+using System.Threading.Tasks;
+using NexusMods.MnemonicDB.Abstractions;
+using NexusMods.MnemonicDB.Abstractions.TxFunctions;
+using NexusMods.MnemonicDB.Storage;
+
+namespace NexusMods.MnemonicDB;
+
+///
+/// A TxFunction that is internal to the MnemonicDB, used mostly for schema updates
+/// and other operations that are started via the TX Queue but are exposed to
+/// users through non transaction APIs
+///
+public interface IInternalTxFunctionImpl : IInternalTxFunction
+{
+ ///
+ /// Executes the function giving the function full access to the store
+ ///
+ public void Execute(DatomStore store);
+
+ ///
+ /// A task that will complete when the transaction is committed
+ ///
+ public Task<(StoreResult, IDb)> Task { get; }
+
+ ///
+ /// Set the result of the transaction
+ ///
+ ///
+ ///
+ public void Complete(StoreResult result, IDb db);
+
+ ///
+ /// Sets the state of the transaction to an exception
+ ///
+ ///
+ public void SetException(Exception exception);
+}
diff --git a/src/NexusMods.MnemonicDB/InternalTxFunctions/AInternalFn.cs b/src/NexusMods.MnemonicDB/InternalTxFunctions/AInternalFn.cs
new file mode 100644
index 0000000..0342962
--- /dev/null
+++ b/src/NexusMods.MnemonicDB/InternalTxFunctions/AInternalFn.cs
@@ -0,0 +1,29 @@
+using System;
+using System.Threading.Tasks;
+using NexusMods.MnemonicDB.Abstractions;
+using NexusMods.MnemonicDB.Storage;
+
+namespace NexusMods.MnemonicDB.InternalTxFunctions;
+
+internal abstract class AInternalFn : IInternalTxFunctionImpl
+{
+ private readonly TaskCompletionSource<(StoreResult, IDb)> _source = new();
+
+ ///
+ /// Execute the function on the store
+ ///
+ ///
+ public abstract void Execute(DatomStore store);
+
+ public Task<(StoreResult, IDb)> Task => _source.Task;
+
+ public void Complete(StoreResult result, IDb db)
+ {
+ _source.SetResult((result, db));
+ }
+
+ public void SetException(Exception exception)
+ {
+ _source.SetException(exception);
+ }
+}
diff --git a/src/NexusMods.MnemonicDB/InternalTxFunctions/CompoundTransaction.cs b/src/NexusMods.MnemonicDB/InternalTxFunctions/CompoundTransaction.cs
new file mode 100644
index 0000000..cfba1d0
--- /dev/null
+++ b/src/NexusMods.MnemonicDB/InternalTxFunctions/CompoundTransaction.cs
@@ -0,0 +1,59 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Microsoft.Extensions.Logging;
+using NexusMods.MnemonicDB.Abstractions;
+using NexusMods.MnemonicDB.Abstractions.IndexSegments;
+using NexusMods.MnemonicDB.Abstractions.TxFunctions;
+using NexusMods.MnemonicDB.Storage;
+
+namespace NexusMods.MnemonicDB.InternalTxFunctions;
+
+///
+/// A transaction that contains one or more user defined functions as well as an index segment
+///
+internal sealed class CompoundTransaction : AInternalFn
+{
+ private readonly IndexSegment _segment;
+ private readonly HashSet _userFunctions;
+
+ ///
+ /// A transaction that contains one or more user defined functions as well as an index segment
+ ///
+ public CompoundTransaction(IndexSegment segment, HashSet userFunctions)
+ {
+ _segment = segment;
+ _userFunctions = userFunctions;
+ }
+
+ ///
+ /// The connection to use for the transaction
+ ///
+ public required IConnection Connection { get; init; }
+
+ ///
+ public override void Execute(DatomStore store)
+ {
+ var secondaryBuilder = new IndexSegmentBuilder(store.AttributeCache);
+ try
+ {
+ var db = new Db(store.CurrentSnapshot, store.AsOfTxId, store.AttributeCache);
+ db.Connection = Connection;
+ var tx = new InternalTransaction(db, secondaryBuilder);
+
+ foreach (var fn in _userFunctions)
+ {
+ fn.Apply(tx, db);
+ }
+ }
+ catch (Exception ex)
+ {
+ store.Logger.LogError(ex, "Failed to apply transaction functions");
+ throw;
+ }
+ var secondaryData = secondaryBuilder.Build();
+ var datoms = _segment.Concat(secondaryData);
+
+ store.LogDatoms(datoms, enableStats: store.Logger.IsEnabled(LogLevel.Debug));
+ }
+}
diff --git a/src/NexusMods.MnemonicDB/InternalTxFunctions/Excise.cs b/src/NexusMods.MnemonicDB/InternalTxFunctions/Excise.cs
new file mode 100644
index 0000000..10a37e5
--- /dev/null
+++ b/src/NexusMods.MnemonicDB/InternalTxFunctions/Excise.cs
@@ -0,0 +1,67 @@
+using NexusMods.MnemonicDB.Abstractions;
+using NexusMods.MnemonicDB.Abstractions.IndexSegments;
+using NexusMods.MnemonicDB.Abstractions.Query;
+using NexusMods.MnemonicDB.Storage;
+
+namespace NexusMods.MnemonicDB.InternalTxFunctions;
+
+///
+/// Deletes the entities and all associated datoms from the database including
+/// the datoms in the history and tx log index.
+///
+///
+internal class Excise(EntityId[] ids) : AInternalFn
+{
+ public override void Execute(DatomStore store)
+ {
+ // Find all datoms for the given entity ids
+ var snapshot = store.CurrentSnapshot;
+ using var currentDatomsBuilder = new IndexSegmentBuilder(store.AttributeCache);
+ using var historyDatomsBuilder = new IndexSegmentBuilder(store.AttributeCache);
+ foreach (var entityId in ids)
+ {
+ // All Current datoms
+ var segment = snapshot.Datoms(SliceDescriptor.Create(entityId));
+ currentDatomsBuilder.Add(segment);
+
+ // All History datoms
+ segment = snapshot.Datoms(SliceDescriptor.Create(IndexType.EAVTHistory, entityId));
+ historyDatomsBuilder.Add(segment);
+ }
+
+ // Build the datoms
+ var currentDatoms = currentDatomsBuilder.Build();
+ var historyDatoms = historyDatomsBuilder.Build();
+
+ // Start the batch
+ var batch = store.Backend.CreateBatch();
+
+ // Delete all datoms in the history and current segments
+ foreach (var datom in historyDatoms)
+ {
+ store.EAVTHistory.Delete(batch, datom);
+ store.AEVTHistory.Delete(batch, datom);
+ store.VAETHistory.Delete(batch, datom);
+ store.AVETHistory.Delete(batch, datom);
+ store.TxLogIndex.Delete(batch, datom);
+ }
+
+ foreach (var datom in currentDatoms)
+ {
+ store.EAVTCurrent.Delete(batch, datom);
+ store.AEVTCurrent.Delete(batch, datom);
+ store.VAETCurrent.Delete(batch, datom);
+ store.AVETCurrent.Delete(batch, datom);
+ store.TxLogIndex.Delete(batch, datom);
+ }
+ batch.Commit();
+
+ // Push through a marker transaction to make sure all indexes are updated
+ {
+ using var builder = new IndexSegmentBuilder(store.AttributeCache);
+ var txId = EntityId.From(PartitionId.Temp.MakeEntityId(0).Value);
+ builder.Add(txId, Abstractions.BuiltInEntities.Transaction.ExcisedDatoms, (ulong)currentDatoms.Count);
+ store.LogDatoms(builder.Build());
+ }
+ }
+}
diff --git a/src/NexusMods.MnemonicDB/InternalTxFunctions/IndexSegmentTransaction.cs b/src/NexusMods.MnemonicDB/InternalTxFunctions/IndexSegmentTransaction.cs
new file mode 100644
index 0000000..45764f9
--- /dev/null
+++ b/src/NexusMods.MnemonicDB/InternalTxFunctions/IndexSegmentTransaction.cs
@@ -0,0 +1,26 @@
+using NexusMods.MnemonicDB.Abstractions.IndexSegments;
+using NexusMods.MnemonicDB.Storage;
+
+namespace NexusMods.MnemonicDB.InternalTxFunctions;
+
+///
+/// A standard transaction that simply processes an index segment
+///
+internal sealed class IndexSegmentTransaction : AInternalFn
+{
+ private readonly IndexSegment _indexSegment;
+
+ ///
+ /// A standard transaction that simply processes an index segment
+ ///
+ public IndexSegmentTransaction(IndexSegment indexSegment)
+ {
+ _indexSegment = indexSegment;
+ }
+
+ ///
+ public override void Execute(DatomStore store)
+ {
+ store.LogDatoms(_indexSegment);
+ }
+}
diff --git a/src/NexusMods.MnemonicDB/InternalTxFunctions/Migrations/AddIndex.cs b/src/NexusMods.MnemonicDB/InternalTxFunctions/Migrations/AddIndex.cs
new file mode 100644
index 0000000..c72d43c
--- /dev/null
+++ b/src/NexusMods.MnemonicDB/InternalTxFunctions/Migrations/AddIndex.cs
@@ -0,0 +1,13 @@
+using NexusMods.MnemonicDB.Abstractions;
+using NexusMods.MnemonicDB.Abstractions.ElementComparers;
+using NexusMods.MnemonicDB.Abstractions.IndexSegments;
+using NexusMods.MnemonicDB.Abstractions.Query;
+using NexusMods.MnemonicDB.Storage;
+using NexusMods.MnemonicDB.Storage.Abstractions;
+
+namespace NexusMods.MnemonicDB.InternalTxFunctions.Migrations;
+
+internal static class AddIndex
+{
+
+}
diff --git a/src/NexusMods.MnemonicDB/InternalTxFunctions/SchemaMigration.cs b/src/NexusMods.MnemonicDB/InternalTxFunctions/SchemaMigration.cs
new file mode 100644
index 0000000..a31c475
--- /dev/null
+++ b/src/NexusMods.MnemonicDB/InternalTxFunctions/SchemaMigration.cs
@@ -0,0 +1,210 @@
+using System.Threading;
+using Microsoft.Extensions.Logging;
+using NexusMods.MnemonicDB.Abstractions;
+using NexusMods.MnemonicDB.Abstractions.BuiltInEntities;
+using NexusMods.MnemonicDB.Abstractions.DatomIterators;
+using NexusMods.MnemonicDB.Abstractions.ElementComparers;
+using NexusMods.MnemonicDB.Abstractions.IndexSegments;
+using NexusMods.MnemonicDB.Abstractions.Query;
+using NexusMods.MnemonicDB.Storage;
+using NexusMods.MnemonicDB.Storage.Abstractions;
+
+namespace NexusMods.MnemonicDB.InternalTxFunctions;
+
+internal class SchemaMigration : AInternalFn
+{
+ private readonly IAttribute[] _declaredAttributes;
+ private ulong _tempId = PartitionId.Temp.MakeEntityId(1).Value;
+
+ public SchemaMigration(IAttribute[] attributes)
+ {
+ _declaredAttributes = attributes;
+ }
+
+
+ ///
+ public EntityId TempId(PartitionId entityPartition)
+ {
+ var tempId = Interlocked.Increment(ref _tempId);
+ // Add the partition to the id
+ var actualId = ((ulong)entityPartition << 40) | tempId;
+ return EntityId.From(actualId);
+ }
+
+ public override void Execute(DatomStore store)
+ {
+ var batch = store.Backend.CreateBatch();
+ var cache = store.AttributeCache;
+ using var builder = new IndexSegmentBuilder(cache);
+ var madeChanges = false;
+ foreach (var attribute in _declaredAttributes)
+ {
+ if (!cache.TryGetAttributeId(attribute.Id, out var aid))
+ {
+ madeChanges = true;
+ AddAttribute(attribute, builder);
+ continue;
+ }
+
+ if (cache.IsIndexed(aid) != attribute.IsIndexed)
+ {
+ if (attribute.IsIndexed)
+ AddIndex(store, aid, batch);
+ else
+ RemoveIndex(store, aid, batch);
+ madeChanges = true;
+ }
+
+ if (cache.GetValueTag(aid) != attribute.LowLevelType)
+ {
+ store.Logger.LogInformation("Converting values for attribute {0} from {1} to {2}", attribute.Id, cache.GetValueTag(aid), attribute.ValueType);
+ ConvertValuesTo(store, attribute.IsIndexed, aid, batch, attribute.LowLevelType);
+ madeChanges = true;
+ }
+ }
+
+ if (!madeChanges)
+ return;
+
+ var built = builder.Build();
+ store.LogDatoms(batch, built, advanceTx: true);
+ store.AttributeCache.Reset(new Db(store.CurrentSnapshot, store.AsOfTxId, store.AttributeCache));
+ }
+
+
+ private void AddAttribute(IAttribute definition, in IndexSegmentBuilder builder)
+ {
+ var id = TempId(PartitionId.Attribute);
+ builder.Add(id, AttributeDefinition.UniqueId, definition.Id);
+ builder.Add(id, AttributeDefinition.ValueType, definition.LowLevelType);
+ builder.Add(id, AttributeDefinition.Cardinality, definition.Cardinalty);
+
+ if (definition.IsIndexed)
+ builder.Add(id, AttributeDefinition.Indexed, Null.Instance);
+
+ if (definition.DeclaredOptional)
+ builder.Add(id, AttributeDefinition.Optional, Null.Instance);
+
+ if (definition.NoHistory)
+ builder.Add(id, AttributeDefinition.NoHistory, Null.Instance);
+ }
+
+ ///
+ /// Remove add indexed datoms for a specific attribute
+ ///
+ internal static void AddIndex(DatomStore store, AttributeId id, IWriteBatch batch)
+ {
+ foreach (var datom in store.CurrentSnapshot.Datoms(SliceDescriptor.Create(id, IndexType.AEVTCurrent)))
+ {
+ store.AVETCurrent.Put(batch, datom);
+ }
+
+ foreach (var datom in store.CurrentSnapshot.Datoms(SliceDescriptor.Create(id, IndexType.AVETCurrent)))
+ {
+ store.AVETHistory.Put(batch, datom);
+ }
+
+ using var builder = new IndexSegmentBuilder(store.AttributeCache);
+ builder.Add(EntityId.From(id.Value), AttributeDefinition.Indexed, Null.Instance);
+ var built = builder.Build();
+
+ store.LogDatoms(batch, built);
+ }
+
+ ///
+ /// Remove the indexed datoms for a specific attribute
+ ///
+ internal static void RemoveIndex(DatomStore store, AttributeId id, IWriteBatch batch)
+ {
+ foreach (var datom in store.CurrentSnapshot.Datoms(SliceDescriptor.Create(id, IndexType.AEVTCurrent)))
+ {
+ store.AVETCurrent.Delete(batch, datom);
+ }
+
+ foreach (var datom in store.CurrentSnapshot.Datoms(SliceDescriptor.Create(id, IndexType.AVETCurrent)))
+ {
+ store.AVETHistory.Delete(batch, datom);
+ }
+
+ using var builder = new IndexSegmentBuilder(store.AttributeCache);
+ builder.Add(EntityId.From(id.Value), AttributeDefinition.Indexed, Null.Instance, true);
+ var built = builder.Build();
+
+ store.LogDatoms(batch, built);
+ }
+
+ internal static void ConvertValuesTo(DatomStore store, bool isIndexed, AttributeId id, IWriteBatch batch, ValueTag newTagType)
+ {
+ using var writer = new PooledMemoryBufferWriter();
+ foreach (var datom in store.CurrentSnapshot.Datoms(SliceDescriptor.Create(id, IndexType.AEVTCurrent)))
+ {
+ store.EAVTCurrent.Delete(batch, datom);
+ store.AEVTCurrent.Delete(batch, datom);
+ store.TxLogIndex.Delete(batch, datom);
+
+ var currentTag = datom.Prefix.ValueTag;
+
+ // if it's a reference, delete it from the backref index
+ if (currentTag == ValueTag.Reference)
+ store.VAETCurrent.Delete(batch, datom);
+
+ // Delete it from the Value index if it's not a reference
+ if (isIndexed && currentTag != ValueTag.Reference)
+ store.AVETCurrent.Delete(batch, datom);
+
+ // Convert the value to the new type
+ var newDatom = ConvertValue(datom, writer, newTagType);
+
+ // Put the converted datom back into the indexes
+ store.EAVTCurrent.Put(batch, newDatom);
+ store.AEVTCurrent.Put(batch, newDatom);
+ store.TxLogIndex.Put(batch, newDatom);
+
+ if (newTagType == ValueTag.Reference)
+ store.VAETCurrent.Put(batch, newDatom);
+
+ if (isIndexed && newTagType != ValueTag.Reference)
+ store.AVETCurrent.Put(batch, newDatom);
+ }
+
+ foreach (var datom in store.CurrentSnapshot.Datoms(SliceDescriptor.Create(id, IndexType.AEVTHistory)))
+ {
+ store.EAVTHistory.Delete(batch, datom);
+ store.AEVTHistory.Delete(batch, datom);
+ store.TxLogIndex.Delete(batch, datom);
+
+ var currentTag = datom.Prefix.ValueTag;
+
+ // if it's a reference, delete it from the backref index
+ if (currentTag == ValueTag.Reference)
+ store.VAETHistory.Delete(batch, datom);
+
+ // Delete it from the Value index if it's not a reference
+ if (isIndexed && currentTag != ValueTag.Reference)
+ store.VAETHistory.Delete(batch, datom);
+
+ // Convert the value to the new type
+ var newDatom = ConvertValue(datom, writer, newTagType);
+
+ // Put the converted datom back into the indexes
+ store.EAVTHistory.Put(batch, newDatom);
+ store.AEVTHistory.Put(batch, newDatom);
+ store.TxLogIndex.Put(batch, newDatom);
+
+ if (newTagType == ValueTag.Reference)
+ store.VAETHistory.Put(batch, newDatom);
+
+ if (isIndexed && newTagType != ValueTag.Reference)
+ store.AVETHistory.Put(batch, newDatom);
+ }
+ }
+
+ private static Datom ConvertValue(Datom datom, PooledMemoryBufferWriter writer, ValueTag newTagType)
+ {
+ writer.Reset();
+ datom.Prefix.ValueTag.ConvertValue(datom.ValueSpan, newTagType, writer);
+ var prefix = datom.Prefix with { ValueTag = newTagType };
+ var newDatom = new Datom(prefix, writer.WrittenMemory);
+ return newDatom;
+ }
+}
diff --git a/src/NexusMods.MnemonicDB/Storage/DatomStorageStructures/PendingTransaction.cs b/src/NexusMods.MnemonicDB/Storage/DatomStorageStructures/PendingTransaction.cs
deleted file mode 100644
index e61eb0d..0000000
--- a/src/NexusMods.MnemonicDB/Storage/DatomStorageStructures/PendingTransaction.cs
+++ /dev/null
@@ -1,36 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-using NexusMods.MnemonicDB.Abstractions;
-using NexusMods.MnemonicDB.Abstractions.IndexSegments;
-using NexusMods.MnemonicDB.Abstractions.TxFunctions;
-
-namespace NexusMods.MnemonicDB.Storage.DatomStorageStructures;
-
-///
-/// Information about a pending transaction, and a way to signal its completion.
-///
-internal class PendingTransaction
-{
- ///
- /// A completion source for the transaction, resolves when the transaction is commited to the
- /// transaction log and available to readers.
- ///
- public TaskCompletionSource<(StoreResult, IDb)> CompletionSource { get; } = new();
-
- ///
- /// The data to be commited
- ///
- public required IndexSegment Data { get; set; }
-
- ///
- /// Tx functions to be applied to the transaction, if any
- ///
- public required HashSet? TxFunctions { get; init; }
-
- public void Complete(StoreResult result, IDb db)
- {
- Data = new IndexSegment();
- Task.Run(() => CompletionSource.SetResult((result, db)));
- }
-}
diff --git a/src/NexusMods.MnemonicDB/Storage/DatomStore.cs b/src/NexusMods.MnemonicDB/Storage/DatomStore.cs
index f98e1d2..f30d0d4 100644
--- a/src/NexusMods.MnemonicDB/Storage/DatomStore.cs
+++ b/src/NexusMods.MnemonicDB/Storage/DatomStore.cs
@@ -3,7 +3,6 @@
using System.Collections.Frozen;
using System.Collections.Generic;
using System.Diagnostics;
-using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
@@ -16,33 +15,33 @@
using NexusMods.MnemonicDB.Abstractions.IndexSegments;
using NexusMods.MnemonicDB.Abstractions.Internals;
using NexusMods.MnemonicDB.Abstractions.Query;
-using NexusMods.MnemonicDB.Abstractions.TxFunctions;
+using NexusMods.MnemonicDB.InternalTxFunctions;
using NexusMods.MnemonicDB.Storage.Abstractions;
-using NexusMods.MnemonicDB.Storage.DatomStorageStructures;
using Reloaded.Memory.Extensions;
namespace NexusMods.MnemonicDB.Storage;
-public partial class DatomStore : IDatomStore
+public sealed partial class DatomStore : IDatomStore
{
- private readonly IIndex _aevtCurrent;
- private readonly IIndex _aevtHistory;
- private readonly IIndex _avetCurrent;
- private readonly IIndex _avetHistory;
- private readonly IStoreBackend _backend;
- private ISnapshot? _currentSnapshot;
- private readonly IIndex _eavtCurrent;
- private readonly IIndex _eavtHistory;
- private readonly ILogger _logger;
+ internal readonly IIndex AEVTCurrent;
+ internal readonly IIndex AEVTHistory;
+ internal readonly IIndex AVETCurrent;
+ internal readonly IIndex AVETHistory;
+ internal readonly IIndex EAVTCurrent;
+ internal readonly IIndex EAVTHistory;
+ internal readonly IIndex VAETCurrent;
+ internal readonly IIndex VAETHistory;
+ internal readonly IIndex TxLogIndex;
+ internal readonly IStoreBackend Backend;
+ internal ISnapshot CurrentSnapshot;
+
+ internal readonly ILogger Logger;
private readonly PooledMemoryBufferWriter _retractWriter;
private readonly AttributeCache _attributeCache;
private readonly DatomStoreSettings _settings;
- private readonly BlockingCollection _pendingTransactions;
- private readonly IIndex _txLog;
+ private readonly BlockingCollection _pendingTransactions;
private DbStream _dbStream;
- private readonly IIndex _vaetCurrent;
- private readonly IIndex _vaetHistory;
private readonly PooledMemoryBufferWriter _writer;
private readonly PooledMemoryBufferWriter _prevWriter;
private TxId _asOfTx = TxId.MinValue;
@@ -75,48 +74,55 @@ public partial class DatomStore : IDatomStore
private CancellationTokenSource _shutdownToken = new();
private TxId _thisTx;
+
+ ///
+ /// Scratch spaced to create new datoms while processing transactions
+ ///
+ private readonly Memory _txScratchSpace;
///
/// DI constructor
///
public DatomStore(ILogger logger, DatomStoreSettings settings, IStoreBackend backend, bool bootstrap = true)
{
+ CurrentSnapshot = default!;
+ _txScratchSpace = new Memory(new byte[1024]);
_remapFunc = Remap;
_dbStream = new DbStream();
_attributeCache = backend.AttributeCache;
- _pendingTransactions = new BlockingCollection(new ConcurrentQueue());
+ _pendingTransactions = new BlockingCollection(new ConcurrentQueue());
- _backend = backend;
+ Backend = backend;
_writer = new PooledMemoryBufferWriter();
_retractWriter = new PooledMemoryBufferWriter();
_prevWriter = new PooledMemoryBufferWriter();
- _logger = logger;
+ Logger = logger;
_settings = settings;
-
- _backend.DeclareEAVT(IndexType.EAVTCurrent);
- _backend.DeclareEAVT(IndexType.EAVTHistory);
- _backend.DeclareAEVT(IndexType.AEVTCurrent);
- _backend.DeclareAEVT(IndexType.AEVTHistory);
- _backend.DeclareVAET(IndexType.VAETCurrent);
- _backend.DeclareVAET(IndexType.VAETHistory);
- _backend.DeclareAVET(IndexType.AVETCurrent);
- _backend.DeclareAVET(IndexType.AVETHistory);
- _backend.DeclareTxLog(IndexType.TxLog);
-
- _backend.Init(settings.Path);
-
- _txLog = _backend.GetIndex(IndexType.TxLog);
- _eavtCurrent = _backend.GetIndex(IndexType.EAVTCurrent);
- _eavtHistory = _backend.GetIndex(IndexType.EAVTHistory);
- _aevtCurrent = _backend.GetIndex(IndexType.AEVTCurrent);
- _aevtHistory = _backend.GetIndex(IndexType.AEVTHistory);
- _vaetCurrent = _backend.GetIndex(IndexType.VAETCurrent);
- _vaetHistory = _backend.GetIndex(IndexType.VAETHistory);
- _avetCurrent = _backend.GetIndex(IndexType.AVETCurrent);
- _avetHistory = _backend.GetIndex(IndexType.AVETHistory);
+ Backend.DeclareEAVT(IndexType.EAVTCurrent);
+ Backend.DeclareEAVT(IndexType.EAVTHistory);
+ Backend.DeclareAEVT(IndexType.AEVTCurrent);
+ Backend.DeclareAEVT(IndexType.AEVTHistory);
+ Backend.DeclareVAET(IndexType.VAETCurrent);
+ Backend.DeclareVAET(IndexType.VAETHistory);
+ Backend.DeclareAVET(IndexType.AVETCurrent);
+ Backend.DeclareAVET(IndexType.AVETHistory);
+ Backend.DeclareTxLog(IndexType.TxLog);
+
+ Backend.Init(settings.Path);
+
+ TxLogIndex = Backend.GetIndex(IndexType.TxLog);
+ EAVTCurrent = Backend.GetIndex(IndexType.EAVTCurrent);
+ EAVTHistory = Backend.GetIndex(IndexType.EAVTHistory);
+ AEVTCurrent = Backend.GetIndex(IndexType.AEVTCurrent);
+ AEVTHistory = Backend.GetIndex(IndexType.AEVTHistory);
+ VAETCurrent = Backend.GetIndex(IndexType.VAETCurrent);
+ VAETHistory = Backend.GetIndex(IndexType.VAETHistory);
+ AVETCurrent = Backend.GetIndex(IndexType.AVETCurrent);
+ AVETHistory = Backend.GetIndex(IndexType.AVETHistory);
+
if (bootstrap)
Bootstrap();
}
@@ -128,93 +134,58 @@ public DatomStore(ILogger logger, DatomStoreSettings settings, IStor
public AttributeCache AttributeCache => _attributeCache;
///
- public async Task<(StoreResult, IDb)> TransactAsync(IndexSegment datoms, HashSet? txFunctions = null)
+ public async Task<(StoreResult, IDb)> TransactAsync(IInternalTxFunction fn)
{
+ var casted = (IInternalTxFunctionImpl)fn;
+ _pendingTransactions.Add(casted);
- var pending = new PendingTransaction
- {
- Data = datoms,
- TxFunctions = txFunctions
- };
- _pendingTransactions.Add(pending);
-
- var task = pending.CompletionSource.Task;
+ var task = casted.Task;
if (await Task.WhenAny(task, Task.Delay(TransactionTimeout)) == task)
{
return await task;
}
- _logger.LogError("Transaction didn't complete after {Timeout}", TransactionTimeout);
+ Logger.LogError("Transaction didn't complete after {Timeout}", TransactionTimeout);
throw new TimeoutException($"Transaction didn't complete after {TransactionTimeout}");
}
///
- public (StoreResult, IDb) Transact(IndexSegment datoms, HashSet? txFunctions = null)
+ public (StoreResult, IDb) Transact(IInternalTxFunction fn)
{
+ var casted = (IInternalTxFunctionImpl)fn;
+ _pendingTransactions.Add(casted);
- var pending = new PendingTransaction
- {
- Data = datoms,
- TxFunctions = txFunctions
- };
- _pendingTransactions.Add(pending);
-
- var task = pending.CompletionSource.Task;
+ var task = casted.Task;
if (Task.WhenAny(task, Task.Delay(TransactionTimeout)).Result == task)
{
return task.Result;
}
- _logger.LogError("Transaction didn't complete after {Timeout}", TransactionTimeout);
+ Logger.LogError("Transaction didn't complete after {Timeout}", TransactionTimeout);
throw new TimeoutException($"Transaction didn't complete after {TransactionTimeout}");
}
///
- public IObservable TxLog
- {
- get
- {
- return _dbStream;
- }
- }
+ public IObservable TxLog => _dbStream;
///
- public void RegisterAttributes(IEnumerable newAttrs)
+ public (StoreResult, IDb) Transact(IndexSegment segment)
{
- var datoms = new IndexSegmentBuilder(_attributeCache);
- var newAttrsArray = newAttrs.ToArray();
-
- var internalTx = new InternalTransaction(null!, datoms);
- foreach (var attribute in newAttrsArray)
- AttributeDefinition.Insert(internalTx, attribute.Attribute, attribute.AttrEntityId.Value);
- internalTx.ProcessTemporaryEntities();
- var (result, idb) = Transact(datoms.Build());
-
- _attributeCache.Reset(idb);
+ return Transact(new IndexSegmentTransaction(segment));
}
///
- public ISnapshot GetSnapshot()
+ public Task<(StoreResult, IDb)> TransactAsync(IndexSegment segment)
{
- Debug.Assert(_currentSnapshot != null, "Current snapshot should not be null, this should never happen");
- return _currentSnapshot!;
+ return TransactAsync(new IndexSegmentTransaction(segment));
}
- public async ValueTask Excise(List datomsToRemove)
+ ///
+ public ISnapshot GetSnapshot()
{
- var batch = _backend.CreateBatch();
- foreach (var datom in datomsToRemove)
- {
- _eavtHistory.Delete(batch, datom);
- _aevtHistory.Delete(batch, datom);
- _vaetHistory.Delete(batch, datom);
- _avetHistory.Delete(batch, datom);
- _txLog.Delete(batch, datom);
- }
- batch.Commit();
-
+ return CurrentSnapshot!;
}
-
+
///
public void Dispose()
{
@@ -228,63 +199,50 @@ public void Dispose()
private void ConsumeTransactions()
{
- var debugEnabled = _logger.IsEnabled(LogLevel.Debug);
+ var debugEnabled = Logger.IsEnabled(LogLevel.Debug);
try
{
while (!_pendingTransactions.IsCompleted && !_shutdownToken.Token.IsCancellationRequested)
{
- if (!_pendingTransactions.TryTake(out var pendingTransaction, -1))
+ if (!_pendingTransactions.TryTake(out var txFn, -1))
continue;
try
{
- // Sync transactions have no data, and are used to verify that the store is up to date.
- if (!pendingTransaction.Data.Valid && pendingTransaction.TxFunctions == null)
- {
- var storeResult = new StoreResult
- {
- Remaps = new Dictionary().ToFrozenDictionary(),
- AssignedTxId = _nextIdCache.AsOfTxId,
- Snapshot = _backend.GetSnapshot(),
- };
- pendingTransaction.Complete(storeResult, _currentDb!);
- continue;
- }
-
- Log(pendingTransaction, out var result);
+ var result = Log(txFn);
if (debugEnabled)
{
var sw = Stopwatch.StartNew();
- FinishTransaction(result, pendingTransaction);
- _logger.LogDebug("Transaction {TxId} post-processed in {Elapsed}ms", result.AssignedTxId, sw.ElapsedMilliseconds);
+ FinishTransaction(result, txFn);
+ Logger.LogDebug("Transaction {TxId} post-processed in {Elapsed}ms", result.AssignedTxId, sw.ElapsedMilliseconds);
}
else
{
- FinishTransaction(result, pendingTransaction);
+ FinishTransaction(result, txFn);
}
}
catch (Exception ex)
{
- _logger.LogError(ex, "While commiting transaction");
- pendingTransaction.CompletionSource.TrySetException(ex);
+ Logger.LogError(ex, "While commiting transaction");
+ txFn.SetException(ex);
}
}
}
catch (Exception ex)
{
- _logger.LogError(ex, "Transaction consumer crashed");
+ Logger.LogError(ex, "Transaction consumer crashed");
}
}
///
/// Given the new store result, process the new database state, complete the transaction and notify the observers
///
- private void FinishTransaction(StoreResult result, PendingTransaction pendingTransaction)
+ private void FinishTransaction(StoreResult result, IInternalTxFunctionImpl pendingTransaction)
{
_currentDb = ((Db)_currentDb!).WithNext(result, result.AssignedTxId);
_dbStream.OnNext(_currentDb);
- pendingTransaction.Complete(result, _currentDb);
+ Task.Run(() => pendingTransaction.Complete(result, _currentDb));
}
///
@@ -294,40 +252,34 @@ private void Bootstrap()
{
try
{
- var snapshot = _backend.GetSnapshot();
- var lastTx = TxId.From(_nextIdCache.LastEntityInPartition(snapshot, PartitionId.Transactions).Value);
+ CurrentSnapshot = Backend.GetSnapshot();
+ var lastTx = TxId.From(_nextIdCache.LastEntityInPartition(CurrentSnapshot, PartitionId.Transactions).Value);
if (lastTx.Value == TxId.MinValue)
{
- _logger.LogInformation("Bootstrapping the datom store no existing state found");
+ Logger.LogInformation("Bootstrapping the datom store no existing state found");
using var builder = new IndexSegmentBuilder(_attributeCache);
var internalTx = new InternalTransaction(null!, builder);
AttributeDefinition.AddInitial(internalTx);
internalTx.ProcessTemporaryEntities();
- var pending = new PendingTransaction
- {
- Data = builder.Build(),
- TxFunctions = null
- };
// Call directly into `Log` as the transaction channel is not yet set up
- Log(pending, out _);
- _currentSnapshot = _backend.GetSnapshot();
+ Log(new IndexSegmentTransaction(builder.Build()));
+ CurrentSnapshot = Backend.GetSnapshot();
}
else
{
- _logger.LogInformation("Bootstrapping the datom store, existing state found, last tx: {LastTx}",
+ Logger.LogInformation("Bootstrapping the datom store, existing state found, last tx: {LastTx}",
lastTx.Value.ToString("x"));
_asOfTx = TxId.From(lastTx.Value);
- _currentSnapshot = snapshot;
}
}
catch (Exception ex)
{
- _logger.LogError(ex, "Failed to bootstrap the datom store");
+ Logger.LogError(ex, "Failed to bootstrap the datom store");
throw;
}
- _currentDb = new Db(_currentSnapshot, _asOfTx, _attributeCache);
+ _currentDb = new Db(CurrentSnapshot, _asOfTx, _attributeCache);
_dbStream.OnNext(_currentDb);
_loggerThread = new Thread(ConsumeTransactions)
{
@@ -354,7 +306,7 @@ private EntityId Remap(EntityId id)
else
{
var partitionId = PartitionId.From((byte)(id.Value >> 40 & 0xFF));
- var assignedId = _nextIdCache.NextId(_currentSnapshot!, partitionId);
+ var assignedId = _nextIdCache.NextId(CurrentSnapshot!, partitionId);
_remaps.Add(id, assignedId);
return assignedId;
}
@@ -367,115 +319,155 @@ private EntityId Remap(EntityId id)
}
- private void Log(PendingTransaction pendingTransaction, out StoreResult result)
+ private StoreResult Log(IInternalTxFunctionImpl pendingTransaction)
{
- var currentSnapshot = _currentSnapshot ?? _backend.GetSnapshot();
- _remaps.Clear();
- _thisTx = TxId.From(_nextIdCache.NextId(currentSnapshot, PartitionId.Transactions).Value);
+ _thisTx = TxId.From(_nextIdCache.NextId(CurrentSnapshot, PartitionId.Transactions).Value);
- using var batch = _backend.CreateBatch();
-
- var swPrepare = Stopwatch.StartNew();
-
_remaps = new Dictionary();
- var secondaryBuilder = new IndexSegmentBuilder(_attributeCache);
- var txId = EntityId.From(_thisTx.Value);
- secondaryBuilder.Add(txId, MnemonicDB.Abstractions.BuiltInEntities.Transaction.Timestamp, DateTime.UtcNow);
+ pendingTransaction.Execute(this);
+
+ return new StoreResult
+ {
+ AssignedTxId = _thisTx,
+ Remaps = _remaps.ToFrozenDictionary(),
+ Snapshot = CurrentSnapshot
+ };
+ }
- if (pendingTransaction.TxFunctions != null)
+ internal void LogDatoms(TSource datoms, bool advanceTx = true, bool enableStats = false)
+ where TSource : IEnumerable
+ {
+ var swPrepare = Stopwatch.StartNew();
+ _remaps.Clear();
+ using var batch = Backend.CreateBatch();
+
+ var datomCount = 0;
+ var dataSize = 0;
+ foreach (var datom in datoms)
{
- try
- {
- var db = _currentDb!;
- var tx = new InternalTransaction(db, secondaryBuilder);
- foreach (var fn in pendingTransaction.TxFunctions)
- {
- fn.Apply(tx, db);
- }
- }
- catch (Exception ex)
+ if (enableStats)
{
- _logger.LogError(ex, "Failed to apply transaction functions");
- throw;
+ datomCount++;
+ dataSize += datom.ValueSpan.Length + KeyPrefix.Size;
}
+ LogDatom(in datom, batch);
}
- var secondaryData = secondaryBuilder.Build();
- var datoms = pendingTransaction.Data.Concat(secondaryData);
-
- foreach (var datom in datoms)
+ if (advanceTx)
+ LogTx(batch);
+
+ batch.Commit();
+ var swWrite = Stopwatch.StartNew();
+
+ // Print statistics if requested
+ if (enableStats)
{
- _writer.Reset();
+ Logger.LogDebug("{TxId} ({Count} datoms, {Size}) prepared in {Elapsed}ms, written in {WriteElapsed}ms",
+ _thisTx,
+ datomCount,
+ dataSize,
+ swPrepare.ElapsedMilliseconds - swWrite.ElapsedMilliseconds,
+ swWrite.ElapsedMilliseconds);
+ }
- var isRemapped = datom.E.InPartition(PartitionId.Temp);
+ // Advance the TX counter, if requested (default)
+ if (advanceTx)
+ _asOfTx = _thisTx;
+
+ // Update the snapshot
+ CurrentSnapshot = Backend.GetSnapshot();
+ }
- var currentPrefix = datom.Prefix;
- var attrId = currentPrefix.A;
+ ///
+ /// Log a collection of datoms to the store using the given batch. If advanceTx is true, the transaction will be advanced
+ /// and this specific transaction will be considered as committed, use this in combination with other log methods
+ /// to build up a single write batch and finish off with this method.
+ ///
+ internal void LogDatoms(IWriteBatch batch, TSource datoms, bool advanceTx = false)
+ where TSource : IEnumerable
+ {
+ foreach (var datom in datoms)
+ LogDatom(in datom, batch);
+
+ if (advanceTx)
+ LogTx(batch);
+
+ batch.Commit();
+
+ // Advance the TX counter, if requested (not default)
+ if (advanceTx)
+ _asOfTx = _thisTx;
+
+ // Update the snapshot
+ CurrentSnapshot = Backend.GetSnapshot();
+ }
+
- var newE = isRemapped ? Remap(currentPrefix.E) : currentPrefix.E;
- var keyPrefix = currentPrefix with {E = newE, T = _thisTx};
+ ///
+ /// Logs the transaction entity to the batch
+ ///
+ ///
+ ///
+ private void LogTx(IWriteBatch batch)
+ {
+ MemoryMarshal.Write(_txScratchSpace.Span, DateTime.UtcNow.ToFileTimeUtc());
+ var id = EntityId.From(_thisTx.Value);
+ var keyPrefix = new KeyPrefix(id, AttributeCache.GetAttributeId(MnemonicDB.Abstractions.BuiltInEntities.Transaction.Timestamp.Id), _thisTx, false, ValueTag.Int64);
+ var datom = new Datom(keyPrefix, _txScratchSpace[..sizeof(long)]);
+ LogDatom(in datom, batch);
+ }
- {
- _writer.WriteMarshal(keyPrefix);
- var valueSpan = datom.ValueSpan;
- var span = _writer.GetSpan(valueSpan.Length);
- valueSpan.CopyTo(span);
- keyPrefix.ValueTag.Remap(span, _remapFunc);
- _writer.Advance(valueSpan.Length);
- }
+ ///
+ /// Log a single datom, this is the inner loop of the transaction processing
+ ///
+ internal void LogDatom(in Datom datom, IWriteBatch batch)
+ {
+ _writer.Reset();
- var newSpan = _writer.GetWrittenSpan();
+ var isRemapped = datom.E.InPartition(PartitionId.Temp);
- if (keyPrefix.IsRetract)
- {
- ProcessRetract(batch, attrId, newSpan, currentSnapshot);
- continue;
- }
+ var currentPrefix = datom.Prefix;
+ var attrId = currentPrefix.A;
- switch (GetPreviousState(isRemapped, attrId, currentSnapshot, newSpan))
- {
- case PrevState.Duplicate:
- continue;
- case PrevState.NotExists:
- ProcessAssert(batch, attrId, newSpan);
- break;
- case PrevState.Exists:
- SwitchPrevToRetraction();
- ProcessRetract(batch, attrId, _retractWriter.GetWrittenSpan(), currentSnapshot);
- ProcessAssert(batch, attrId, newSpan);
- break;
- }
+ var newE = isRemapped ? Remap(currentPrefix.E) : currentPrefix.E;
+ var keyPrefix = currentPrefix with {E = newE, T = _thisTx};
+ {
+ _writer.WriteMarshal(keyPrefix);
+ var valueSpan = datom.ValueSpan;
+ var span = _writer.GetSpan(valueSpan.Length);
+ valueSpan.CopyTo(span);
+ keyPrefix.ValueTag.Remap(span, _remapFunc);
+ _writer.Advance(valueSpan.Length);
}
- var swWrite = Stopwatch.StartNew();
- batch.Commit();
+ var newSpan = _writer.GetWrittenSpan();
- if (_logger.IsEnabled(LogLevel.Debug))
- _logger.LogDebug("{TxId} ({Count} datoms, {Size}) prepared in {Elapsed}ms, written in {WriteElapsed}ms",
- _thisTx,
- pendingTransaction.Data.Count + secondaryData.Count,
- pendingTransaction.Data.DataSize + secondaryData.DataSize,
- swPrepare.ElapsedMilliseconds - swWrite.ElapsedMilliseconds,
- swWrite.ElapsedMilliseconds);
-
- _asOfTx = _thisTx;
+ if (keyPrefix.IsRetract)
+ {
+ ProcessRetract(batch, attrId, newSpan, CurrentSnapshot!);
+ return;
+ }
- _currentSnapshot = _backend.GetSnapshot();
- result = new StoreResult
+ switch (GetPreviousState(isRemapped, attrId, CurrentSnapshot!, newSpan))
{
- AssignedTxId = _thisTx,
- Remaps = _remaps.ToFrozenDictionary(),
- Snapshot = _currentSnapshot
- };
+ case PrevState.Duplicate:
+ return;
+ case PrevState.NotExists:
+ ProcessAssert(batch, attrId, newSpan);
+ break;
+ case PrevState.Exists:
+ SwitchPrevToRetraction();
+ ProcessRetract(batch, attrId, _retractWriter.GetWrittenSpan(), CurrentSnapshot!);
+ ProcessAssert(batch, attrId, newSpan);
+ break;
+ }
}
-
+
///
/// Updates the data in _prevWriter to be a retraction of the data in that write.
///
- ///
- ///
private void SwitchPrevToRetraction()
{
var prevKey = MemoryMarshal.Read(_retractWriter.GetWrittenSpan());
@@ -510,68 +502,79 @@ private void ProcessRetract(IWriteBatch batch, AttributeId attrId, ReadOnlySpan<
.FirstOrDefault();
#if DEBUG
- unsafe
- {
- Debug.Assert(prevDatom.Valid, "Previous datom should exist");
- var debugKey = prevDatom.Prefix;
-
- var otherPrefix = MemoryMarshal.Read(datom);
- Debug.Assert(debugKey.E == otherPrefix.E, "Entity should match");
- Debug.Assert(debugKey.A == otherPrefix.A, "Attribute should match");
-
- fixed (byte* aTmp = prevDatom.ValueSpan)
- fixed (byte* bTmp = datom.SliceFast(sizeof(KeyPrefix)))
- {
- var valueTag = prevDatom.Prefix.ValueTag;
- var cmp = Serializer.Compare(prevDatom.Prefix.ValueTag, aTmp, prevDatom.ValueSpan.Length, otherPrefix.ValueTag, bTmp, datom.Length - sizeof(KeyPrefix));
- Debug.Assert(cmp == 0, "Values should match");
- }
- }
+ // In debug mode, we perform additional checks to make sure the retraction found a datom to retract
+ // The only time this fails is if the datom lookup functions `.Datoms()` are broken
+ RetractionSantiyCheck(datom, prevDatom);
#endif
- _eavtCurrent.Delete(batch, prevDatom);
- _aevtCurrent.Delete(batch, prevDatom);
+ // Delete the datom from the current indexes
+ EAVTCurrent.Delete(batch, prevDatom);
+ AEVTCurrent.Delete(batch, prevDatom);
if (_attributeCache.IsReference(attrId))
- _vaetCurrent.Delete(batch, prevDatom);
+ VAETCurrent.Delete(batch, prevDatom);
if (_attributeCache.IsIndexed(attrId))
- _avetCurrent.Delete(batch, prevDatom);
-
- _txLog.Put(batch, datom);
+ AVETCurrent.Delete(batch, prevDatom);
+
+ // Put the retraction in the log
+ TxLogIndex.Put(batch, datom);
+
+ // If the attribute is a no history attribute, we don't need to put the retraction in the history indexes
+ // so we can skip the rest of the processing
if (_attributeCache.IsNoHistory(attrId))
return;
// Move the datom to the history index and also record the retraction
- _eavtHistory.Put(batch, prevDatom);
- _eavtHistory.Put(batch, datom);
+ EAVTHistory.Put(batch, prevDatom);
+ EAVTHistory.Put(batch, datom);
// Move the datom to the history index and also record the retraction
- _aevtHistory.Put(batch, prevDatom);
- _aevtHistory.Put(batch, datom);
+ AEVTHistory.Put(batch, prevDatom);
+ AEVTHistory.Put(batch, datom);
if (_attributeCache.IsReference(attrId))
{
- _vaetHistory.Put(batch, prevDatom);
- _vaetHistory.Put(batch, datom);
+ VAETHistory.Put(batch, prevDatom);
+ VAETHistory.Put(batch, datom);
}
if (_attributeCache.IsIndexed(attrId))
{
- _avetHistory.Put(batch, prevDatom);
- _avetHistory.Put(batch, datom);
+ AVETHistory.Put(batch, prevDatom);
+ AVETHistory.Put(batch, datom);
+ }
+ }
+
+ private static unsafe void RetractionSantiyCheck(ReadOnlySpan datom, Datom prevDatom)
+ {
+ Debug.Assert(prevDatom.Valid, "Previous datom should exist");
+ var debugKey = prevDatom.Prefix;
+
+ var otherPrefix = MemoryMarshal.Read(datom);
+ Debug.Assert(debugKey.E == otherPrefix.E, "Entity should match");
+ Debug.Assert(debugKey.A == otherPrefix.A, "Attribute should match");
+
+ fixed (byte* aTmp = prevDatom.ValueSpan)
+ fixed (byte* bTmp = datom.SliceFast(sizeof(KeyPrefix)))
+ {
+ var cmp = Serializer.Compare(prevDatom.Prefix.ValueTag, aTmp, prevDatom.ValueSpan.Length, otherPrefix.ValueTag, bTmp, datom.Length - sizeof(KeyPrefix));
+ Debug.Assert(cmp == 0, "Values should match");
}
}
private void ProcessAssert(IWriteBatch batch, AttributeId attributeId, ReadOnlySpan datom)
{
- _txLog.Put(batch, datom);
- _eavtCurrent.Put(batch, datom);
- _aevtCurrent.Put(batch, datom);
+ TxLogIndex.Put(batch, datom);
+ EAVTCurrent.Put(batch, datom);
+ AEVTCurrent.Put(batch, datom);
if (_attributeCache.IsReference(attributeId))
- _vaetCurrent.Put(batch, datom);
+ VAETCurrent.Put(batch, datom);
if (_attributeCache.IsIndexed(attributeId))
- _avetCurrent.Put(batch, datom);
+ AVETCurrent.Put(batch, datom);
}
+ ///
+ /// Used to communicate the state of a given datom
+ ///
enum PrevState
{
Exists,
diff --git a/src/NexusMods.MnemonicDB/Storage/ImportExport.cs b/src/NexusMods.MnemonicDB/Storage/ImportExport.cs
index e0c0d5d..f8a8fa4 100644
--- a/src/NexusMods.MnemonicDB/Storage/ImportExport.cs
+++ b/src/NexusMods.MnemonicDB/Storage/ImportExport.cs
@@ -6,6 +6,7 @@
using NexusMods.MnemonicDB.Abstractions;
using NexusMods.MnemonicDB.Abstractions.IndexSegments;
using NexusMods.MnemonicDB.Abstractions.Query;
+using NexusMods.MnemonicDB.Storage.Abstractions;
namespace NexusMods.MnemonicDB.Storage;
@@ -35,7 +36,7 @@ public async Task ExportAsync(Stream stream)
binaryWriter.Write(FourCC);
binaryWriter.Write((ushort)1);
- var snapshot = _backend.GetSnapshot();
+ var snapshot = CurrentSnapshot;
foreach (var indexType in Enum.GetValues())
{
@@ -52,7 +53,7 @@ public async Task ExportAsync(Stream stream)
exportedDatoms += chunk.Count;
}
}
- _logger.LogInformation("Exported {0} datoms", exportedDatoms);
+ Logger.LogInformation("Exported {0} datoms", exportedDatoms);
}
public async Task ImportAsync(Stream stream)
@@ -68,45 +69,52 @@ public async Task ImportAsync(Stream stream)
if (version != 1)
throw new InvalidDataException("Invalid file version");
- while (stream.Position < stream.Length)
+ try
{
- var indexType = (IndexType)binaryReader.ReadByte();
- var datomCount = binaryReader.ReadUInt32();
- var chunkSize = binaryReader.ReadUInt32();
- var data = binaryReader.ReadBytes((int)chunkSize);
- var segment = new IndexSegment((int)datomCount, data.AsMemory(), _backend.AttributeCache);
-
- using var batch = _backend.CreateBatch();
- var index = _backend.GetIndex(indexType);
-
- foreach (var datom in segment)
- index.Put(batch, datom);
-
- batch.Commit();
- importedCount += (int)datomCount;
+ while (true)
+ {
+ var indexType = (IndexType)binaryReader.ReadByte();
+ var datomCount = binaryReader.ReadUInt32();
+ var chunkSize = binaryReader.ReadUInt32();
+ var data = binaryReader.ReadBytes((int)chunkSize);
+ var segment = new IndexSegment((int)datomCount, data.AsMemory(), AttributeCache);
+
+ using var batch = Backend.CreateBatch();
+ var index = Backend.GetIndex(indexType);
+
+ foreach (var datom in segment)
+ index.Put(batch, datom);
+
+ batch.Commit();
+ importedCount += (int)datomCount;
+ }
}
-
- _logger.LogInformation("Imported {0} datoms", importedCount);
+ catch (EndOfStreamException)
+ {
+ // End of stream
+ }
+
+ Logger.LogInformation("Imported {0} datoms", importedCount);
_nextIdCache.ResetCaches();
Bootstrap();
}
private void CleanStore()
{
- int datomCount = 0;
- var snapshot = _backend.GetSnapshot();
- using var batch = _backend.CreateBatch();
+ var datomCount = 0;
+ var snapshot = Backend.GetSnapshot();
+ using var batch = Backend.CreateBatch();
foreach (var index in Enum.GetValues())
{
var slice = SliceDescriptor.Create(index);
var datoms = snapshot.Datoms(slice);
foreach (var datom in datoms)
{
- _backend.GetIndex(index).Delete(batch, datom);
+ Backend.GetIndex(index).Delete(batch, datom);
datomCount++;
}
}
batch.Commit();
- _logger.LogInformation("Cleaned {0} datoms", datomCount);
+ Logger.LogInformation("Cleaned {0} datoms", datomCount);
}
}
diff --git a/src/NexusMods.MnemonicDB/Transaction.cs b/src/NexusMods.MnemonicDB/Transaction.cs
index 29725ea..e058ed5 100644
--- a/src/NexusMods.MnemonicDB/Transaction.cs
+++ b/src/NexusMods.MnemonicDB/Transaction.cs
@@ -10,6 +10,7 @@
using NexusMods.MnemonicDB.Abstractions.Internals;
using NexusMods.MnemonicDB.Abstractions.Models;
using NexusMods.MnemonicDB.Abstractions.TxFunctions;
+using NexusMods.MnemonicDB.InternalTxFunctions;
namespace NexusMods.MnemonicDB;
@@ -22,6 +23,7 @@ internal class Transaction(Connection connection) : ITransaction
private ulong _tempId = PartitionId.Temp.MakeEntityId(1).Value;
private bool _committed;
private readonly object _lock = new();
+ private IInternalTxFunction? _internalTxFunction;
///
public EntityId TempId(PartitionId entityPartition)
@@ -84,6 +86,14 @@ public void Add(ITxFunction fn)
}
}
+ ///
+ /// Sets the internal transaction function to the given function.
+ ///
+ public void Set(IInternalTxFunction fn)
+ {
+ _internalTxFunction = fn;
+ }
+
public void Attach(ITemporaryEntity entity)
{
lock (_lock)
@@ -133,7 +143,15 @@ public async Task Commit()
// Build the datoms block here, so that future calls to add won't modify this while we're building
built = _datoms.Build();
}
- return await connection.Transact(built, _txFunctions);
+
+ if (_internalTxFunction is not null)
+ return await connection.Transact(_internalTxFunction);
+
+ if (_txFunctions is not null)
+ return await connection.Transact(new CompoundTransaction(built, _txFunctions!) { Connection = connection });
+
+ return await connection.Transact(new IndexSegmentTransaction(built));
+
}
///
diff --git a/tests/NexusMods.MnemonicDB.Storage.Tests/NullConnection.cs b/tests/NexusMods.MnemonicDB.Storage.Tests/NullConnection.cs
index b7edfc8..9ba113d 100644
--- a/tests/NexusMods.MnemonicDB.Storage.Tests/NullConnection.cs
+++ b/tests/NexusMods.MnemonicDB.Storage.Tests/NullConnection.cs
@@ -20,7 +20,7 @@ public IDb AsOf(TxId txId)
public IDb History()
{
- throw new NotImplementedException();
+ throw new NotSupportedException();
}
public ITransaction BeginTransaction()
@@ -29,8 +29,13 @@ public ITransaction BeginTransaction()
}
public IAnalyzer[] Analyzers => throw new NotSupportedException();
- public Task Excise(EntityId[] entityIds)
+ public Task Excise(EntityId[] entityIds)
+ {
+ throw new NotSupportedException();
+ }
+
+ public Task UpdateSchema(params IAttribute[] attribute)
{
- throw new NotImplementedException();
+ throw new NotSupportedException();
}
}
diff --git a/tests/NexusMods.MnemonicDB.Tests/AMnemonicDBTest.cs b/tests/NexusMods.MnemonicDB.Tests/AMnemonicDBTest.cs
index 3c61a76..5df7971 100644
--- a/tests/NexusMods.MnemonicDB.Tests/AMnemonicDBTest.cs
+++ b/tests/NexusMods.MnemonicDB.Tests/AMnemonicDBTest.cs
@@ -1,4 +1,5 @@
-using System.IO.Hashing;
+using System.IO.Compression;
+using System.IO.Hashing;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -55,6 +56,15 @@ protected AMnemonicDBTest(IServiceProvider provider)
Logger = provider.GetRequiredService>();
}
+ protected async Task LoadDatamodel(RelativePath name)
+ {
+ var fullPath = FileSystem.Shared.GetKnownPath(KnownPath.EntryDirectory).Combine("Resources").Combine(name);
+
+ await using var stream = fullPath.Read();
+ await using var decompressStream = new DeflateStream(stream, CompressionMode.Decompress);
+ await _store.ImportAsync(decompressStream);
+ }
+
protected DatomStoreSettings Config { get; set; }
protected SettingsTask VerifyModel(T model)
diff --git a/tests/NexusMods.MnemonicDB.Tests/MigrationTests.cs b/tests/NexusMods.MnemonicDB.Tests/MigrationTests.cs
new file mode 100644
index 0000000..b6e2c78
--- /dev/null
+++ b/tests/NexusMods.MnemonicDB.Tests/MigrationTests.cs
@@ -0,0 +1,128 @@
+using System.Net;
+using NexusMods.Hashing.xxHash64;
+using NexusMods.MnemonicDB.Abstractions;
+using NexusMods.MnemonicDB.Abstractions.Attributes;
+using NexusMods.MnemonicDB.Abstractions.IndexSegments;
+using NexusMods.MnemonicDB.Abstractions.Query;
+using NexusMods.MnemonicDB.TestModel;
+using NexusMods.MnemonicDB.TestModel.Attributes;
+using NexusMods.Paths;
+using File = NexusMods.MnemonicDB.TestModel.File;
+
+namespace NexusMods.MnemonicDB.Tests;
+
+public class MigrationTests : AMnemonicDBTest
+{
+ public MigrationTests(IServiceProvider provider) : base(provider)
+ {
+
+ }
+
+ private async Task AddData()
+ {
+ using var tx = Connection.BeginTransaction();
+ for (var l = 0; l < 10; l++)
+ {
+ var loadout = new Loadout.New(tx)
+ {
+ Name = $"Loadout {l}"
+ };
+
+ for (var m = 0; m < 10; m++)
+ {
+ var mod = new Mod.New(tx)
+ {
+ Name = $"Mod {m}",
+ Description = $"{m}",
+ Source = new Uri($"http://mod{m}.com"),
+ LoadoutId = loadout
+ };
+
+ for (var f = 0; f < 10; f++)
+ {
+ var file = new File.New(tx)
+ {
+ Path = $"File {f}",
+ ModId = mod,
+ Size = Size.FromLong(f * m * l),
+ Hash = Hash.FromLong(f * m * l)
+ };
+
+ }
+ }
+ }
+ await tx.Commit();
+ }
+
+ [Fact]
+ public async Task CanAddIndex()
+ {
+ await AddData();
+
+ var cache = Connection.AttributeCache;
+ var aid = cache.GetAttributeId(Mod.Description.Id);
+ cache.IsIndexed(aid).Should().BeFalse();
+
+ var withIndex = new StringAttribute(Mod.Description.Id.Namespace, Mod.Description.Id.Name) { IsIndexed = true, IsOptional = true };
+ var prevTxId = Connection.Db.BasisTxId;
+
+ await Connection.UpdateSchema(withIndex);
+
+ Connection.Db.BasisTxId.Value.Should().Be(prevTxId.Value + 1);
+ cache.IsIndexed(cache.GetAttributeId(Mod.Description.Id)).Should().BeTrue();
+
+ var foundByDocs = Connection.Db.Datoms(Mod.Description, "0").ToArray();
+ foundByDocs.Length.Should().Be(10);
+ }
+
+ [Fact]
+ public async Task CanRemoveIndex()
+ {
+ await AddData();
+
+ var cache = Connection.AttributeCache;
+ var aid = cache.GetAttributeId(Mod.Source.Id);
+ cache.IsIndexed(aid).Should().BeTrue();
+
+ var withIndex = new UriAttribute(Mod.Source.Id.Namespace, Mod.Source.Id.Name) { IsIndexed = false };
+ var prevTxId = Connection.Db.BasisTxId;
+
+ await Connection.UpdateSchema(withIndex);
+
+ Connection.Db.BasisTxId.Value.Should().Be(prevTxId.Value + 1);
+ cache.IsIndexed(cache.GetAttributeId(Mod.Source.Id)).Should().BeFalse();
+
+ Action act = () => Connection.Db.Datoms(Mod.Source, new Uri("http://mod0.com")).ToArray();
+ act.Should().Throw();
+ }
+
+ [Fact]
+ public async Task CanConvertValues()
+ {
+ await AddData();
+
+ var cache = Connection.AttributeCache;
+ var aid = cache.GetAttributeId(Mod.Description.Id);
+ cache.IsIndexed(aid).Should().BeFalse();
+
+ var withIndex = new ULongAttribute(Mod.Description.Id.Namespace, Mod.Description.Id.Name) { IsIndexed = true, IsOptional = false };
+ var prevTxId = Connection.Db.BasisTxId;
+
+ await Connection.UpdateSchema(withIndex);
+
+ Connection.Db.BasisTxId.Value.Should().Be(prevTxId.Value + 1);
+ cache.IsIndexed(cache.GetAttributeId(Mod.Description.Id)).Should().BeTrue();
+
+ var foundByDocs = Connection.Db.Datoms(withIndex, 0UL).ToArray();
+ foundByDocs.Length.Should().Be(10);
+ }
+
+ [Fact]
+ public async Task ConvertingValuesIncorrectlyFails()
+ {
+ await AddData();
+ var withIndex = new ULongAttribute(Mod.Source.Id.Namespace, Mod.Source.Id.Name) { IsIndexed = true, IsOptional = false };
+ var act = async () => await Connection.UpdateSchema(withIndex);
+ await act.Should().ThrowAsync("Converting values for attribute Mod.Source from String to ULong where the source is a URI should fail");
+ }
+}