diff --git a/.gitmodules b/.gitmodules index a9d31ba..d92a876 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,4 @@ [submodule "dependencies/NEventStore"] path = dependencies/NEventStore - url = https://github.com/NEventStore/NEventStore.git + url = https://github.com/SignPath/NEventStore.git + branch = feature/AddDedicatedEventDeserialization diff --git a/dependencies/NEventStore b/dependencies/NEventStore index a7d4848..54572c7 160000 --- a/dependencies/NEventStore +++ b/dependencies/NEventStore @@ -1 +1 @@ -Subproject commit a7d4848ac5ec6b5d6f1e7bbc19a2381ca5d0e721 +Subproject commit 54572c7591400630bb5f7c67003e7759aedad5b0 diff --git a/src/NEventStore.Persistence.MsSql.Tests/PersistenceEngineFixture.cs b/src/NEventStore.Persistence.MsSql.Tests/PersistenceEngineFixture.cs index d3f0743..c7658fa 100644 --- a/src/NEventStore.Persistence.MsSql.Tests/PersistenceEngineFixture.cs +++ b/src/NEventStore.Persistence.MsSql.Tests/PersistenceEngineFixture.cs @@ -1,4 +1,5 @@ -using NEventStore.Persistence.Sql.Tests; +using Microsoft.Data.SqlClient; +using NEventStore.Persistence.Sql.Tests; namespace NEventStore.Persistence.AcceptanceTests { @@ -13,17 +14,23 @@ public partial class PersistenceEngineFixture /// this mimic the current NEventStore default values which is run outside any transaction (creates a scope that /// suppresses any transaction) /// - public TransactionScopeOption? ScopeOption { get; set; } = null; // the old default: TransactionScopeOption.Suppress; + public TransactionScopeOption? ScopeOption { get; set; } = + null; // the old default: TransactionScopeOption.Suppress; public PersistenceEngineFixture() { _createPersistence = pageSize => - new SqlPersistenceFactory(new EnviromentConnectionFactory("MsSql", Microsoft.Data.SqlClient.SqlClientFactory.Instance), - new BinarySerializer(), + { + var serializer = new BinarySerializer(); + return new SqlPersistenceFactory( + new EnviromentConnectionFactory("MsSql", SqlClientFactory.Instance), + serializer, + new DefaultEventSerializer(serializer), new MsSqlDialect(), pageSize: pageSize, scopeOption: ScopeOption - ).Build(); + ).Build(); + }; } } } \ No newline at end of file diff --git a/src/NEventStore.Persistence.MsSql.Tests/TransactionIsolationLevelTests.cs b/src/NEventStore.Persistence.MsSql.Tests/TransactionIsolationLevelTests.cs index 0355678..7b9dc01 100644 --- a/src/NEventStore.Persistence.MsSql.Tests/TransactionIsolationLevelTests.cs +++ b/src/NEventStore.Persistence.MsSql.Tests/TransactionIsolationLevelTests.cs @@ -124,9 +124,13 @@ public IsolationLevelPersistenceEngineFixture() _recorder = new IsolationLevelRecorder(); _connectionFactory = new EnviromentConnectionFactory("MsSql", Microsoft.Data.SqlClient.SqlClientFactory.Instance); _createPersistence = () => - new SqlPersistenceFactory(_connectionFactory, - new BinarySerializer(), + { + var serializer = new BinarySerializer(); + return new SqlPersistenceFactory(_connectionFactory, + serializer, + new DefaultEventSerializer(serializer), new IsolationLevelRecordingSqlDialect(_recorder)).Build(); + }; } public void Initialize() diff --git a/src/NEventStore.Persistence.MySql.Tests/PersistenceEngineFixture.cs b/src/NEventStore.Persistence.MySql.Tests/PersistenceEngineFixture.cs index 505ff6c..7e32b3d 100644 --- a/src/NEventStore.Persistence.MySql.Tests/PersistenceEngineFixture.cs +++ b/src/NEventStore.Persistence.MySql.Tests/PersistenceEngineFixture.cs @@ -13,20 +13,28 @@ public PersistenceEngineFixture() { #if NET462 _createPersistence = pageSize => - new SqlPersistenceFactory( - new EnviromentConnectionFactory("MySql", "MySql.Data.MySqlClient"), - new BinarySerializer(), - new MySqlDialect(), - pageSize: pageSize).Build(); + { + var serializer = new BinarySerializer(); + return new SqlPersistenceFactory( + new EnviromentConnectionFactory("MySql", "MySql.Data.MySqlClient"), + serializer, + new DefaultEventSerializer(serializer), + new MySqlDialect(), + pageSize: pageSize).Build(); + }; // Wireup.Init().UsingSqlPersistence("test"); #else _createPersistence = pageSize => - new SqlPersistenceFactory( + { + var serializer = new BinarySerializer(); + return new SqlPersistenceFactory( new EnviromentConnectionFactory("MySql", MySqlClientFactory.Instance), - new BinarySerializer(), + serializer, + new DefaultEventSerializer(serializer), new MySqlDialect(), pageSize: pageSize).Build(); + }; // Wireup.Init().UsingSqlPersistence("test"); #endif diff --git a/src/NEventStore.Persistence.PostgreSql.Tests/PersistenceEngineFixture.cs b/src/NEventStore.Persistence.PostgreSql.Tests/PersistenceEngineFixture.cs index 89ac0c5..2086e9c 100644 --- a/src/NEventStore.Persistence.PostgreSql.Tests/PersistenceEngineFixture.cs +++ b/src/NEventStore.Persistence.PostgreSql.Tests/PersistenceEngineFixture.cs @@ -16,18 +16,26 @@ public PersistenceEngineFixture() #if NET462 _createPersistence = pageSize => - new SqlPersistenceFactory( + { + var serializer = new BinarySerializer(); + return new SqlPersistenceFactory( new EnviromentConnectionFactory("PostgreSql", "Npgsql"), - new BinarySerializer(), + serializer, + new DefaultEventSerializer(serializer), new PostgreNpgsql6Dialect(npgsql6timestamp: true), pageSize: pageSize).Build(); + }; #else _createPersistence = pageSize => - new SqlPersistenceFactory( + { + var serializer = new BinarySerializer(); + return new SqlPersistenceFactory( new EnviromentConnectionFactory("PostgreSql", Npgsql.NpgsqlFactory.Instance), - new BinarySerializer(), + serializer, + new DefaultEventSerializer(serializer), new PostgreNpgsql6Dialect(npgsql6timestamp: true), pageSize: pageSize).Build(); + }; #endif } } diff --git a/src/NEventStore.Persistence.Sql/CommitExtensions.cs b/src/NEventStore.Persistence.Sql/CommitExtensions.cs index c748054..ed58cd5 100644 --- a/src/NEventStore.Persistence.Sql/CommitExtensions.cs +++ b/src/NEventStore.Persistence.Sql/CommitExtensions.cs @@ -21,19 +21,30 @@ public static class CommitExtensions private const int PayloadIndex = 9; private static readonly ILogger Logger = LogFactory.BuildLogger(typeof (CommitExtensions)); - public static ICommit GetCommit(this IDataRecord record, ISerialize serializer, ISqlDialect sqlDialect) + public static ICommit GetCommit(this IDataRecord record, ISerialize serializer, ISerializeEvents eventSerializer, ISqlDialect sqlDialect) { Logger.LogTrace(Messages.DeserializingCommit, serializer.GetType()); + + var bucketId = record[BucketIdIndex].ToString(); + var streamId = record[StreamIdOriginalIndex].ToString(); + var streamRevision = record[StreamRevisionIndex].ToInt(); + var commitId = record[CommitIdIndex].ToGuid(); + var commitSequence = record[CommitSequenceIndex].ToInt(); + var commitStamp = sqlDialect.ToDateTime(record[CommitStampIndex]); + var checkpointToken = record.CheckpointNumber(); + var headers = serializer.Deserialize>(record, HeadersIndex); - var events = serializer.Deserialize>(record, PayloadIndex); - return new Commit(record[BucketIdIndex].ToString(), - record[StreamIdOriginalIndex].ToString(), - record[StreamRevisionIndex].ToInt(), - record[CommitIdIndex].ToGuid(), - record[CommitSequenceIndex].ToInt(), - sqlDialect.ToDateTime(record[CommitStampIndex]), - record[CheckpointIndex].ToLong(), + var events = eventSerializer.DeserializeEventMessages((byte[])record[PayloadIndex], bucketId, streamId, + streamRevision, commitId, commitSequence, commitStamp, checkpointToken, headers); + + return new Commit(bucketId, + streamId, + streamRevision, + commitId, + commitSequence, + commitStamp, + checkpointToken, headers, events); } diff --git a/src/NEventStore.Persistence.Sql/SqlPersistenceEngine.cs b/src/NEventStore.Persistence.Sql/SqlPersistenceEngine.cs index a7b0d5d..af71c34 100644 --- a/src/NEventStore.Persistence.Sql/SqlPersistenceEngine.cs +++ b/src/NEventStore.Persistence.Sql/SqlPersistenceEngine.cs @@ -19,6 +19,7 @@ public class SqlPersistenceEngine : IPersistStreams private readonly int _pageSize; private readonly TransactionScopeOption? _scopeOption; private readonly ISerialize _serializer; + private readonly ISerializeEvents _eventSerializer; private bool _disposed; private int _initialized; private readonly IStreamIdHasher _streamIdHasher; @@ -27,15 +28,17 @@ public SqlPersistenceEngine( IConnectionFactory connectionFactory, ISqlDialect dialect, ISerialize serializer, + ISerializeEvents eventSerializer, int pageSize, TransactionScopeOption? scopeOption = null) - : this(connectionFactory, dialect, serializer, pageSize, new Sha1StreamIdHasher(), scopeOption) + : this(connectionFactory, dialect, serializer, eventSerializer, pageSize, new Sha1StreamIdHasher(), scopeOption) { } public SqlPersistenceEngine( IConnectionFactory connectionFactory, ISqlDialect dialect, ISerialize serializer, + ISerializeEvents eventSerializer, int pageSize, IStreamIdHasher streamIdHasher, TransactionScopeOption? scopeOption = null) @@ -53,6 +56,7 @@ public SqlPersistenceEngine( _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); _dialect = dialect ?? throw new ArgumentNullException(nameof(dialect)); _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); + _eventSerializer = eventSerializer ?? throw new ArgumentNullException(nameof(eventSerializer)); _scopeOption = scopeOption; _pageSize = pageSize; _streamIdHasher = new StreamIdHasherValidator(streamIdHasher); @@ -91,7 +95,7 @@ public virtual IEnumerable GetFrom(string bucketId, string streamId, in query.AddParameter(_dialect.CommitSequence, 0); return query .ExecutePagedQuery(statement, _dialect.NextPageDelegate) - .Select(x => x.GetCommit(_serializer, _dialect)); + .Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect)); }); } @@ -107,7 +111,7 @@ public virtual IEnumerable GetFrom(string bucketId, DateTime start) query.AddParameter(_dialect.BucketId, bucketId, DbType.AnsiString); query.AddParameter(_dialect.CommitStamp, start, _dialect.GetDateTimeDbType()); return query.ExecutePagedQuery(statement, (q, r) => { }) - .Select(x => x.GetCommit(_serializer, _dialect)); + .Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect)); }); } @@ -125,7 +129,7 @@ public virtual IEnumerable GetFromTo(string bucketId, DateTime start, D query.AddParameter(_dialect.CommitStampStart, start, _dialect.GetDateTimeDbType()); query.AddParameter(_dialect.CommitStampEnd, end, _dialect.GetDateTimeDbType()); return query.ExecutePagedQuery(statement, (q, r) => { }) - .Select(x => x.GetCommit(_serializer, _dialect)); + .Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect)); }); } @@ -243,7 +247,7 @@ public IEnumerable GetFrom(string bucketId, Int64 checkpointToken) query.AddParameter(_dialect.BucketId, bucketId, DbType.AnsiString); query.AddParameter(_dialect.CheckpointNumber, checkpointToken); return query.ExecutePagedQuery(statement, (q, r) => { }) - .Select(x => x.GetCommit(_serializer, _dialect)); + .Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect)); }); } @@ -257,7 +261,7 @@ public IEnumerable GetFromTo(String bucketId, Int64 from, Int64 to) query.AddParameter(_dialect.FromCheckpointNumber, from); query.AddParameter(_dialect.ToCheckpointNumber, to); return query.ExecutePagedQuery(statement, (q, r) => { }) - .Select(x => x.GetCommit(_serializer, _dialect)); + .Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect)); }); } @@ -269,7 +273,7 @@ public IEnumerable GetFrom(Int64 checkpointToken) string statement = _dialect.GetCommitsFromCheckpoint; query.AddParameter(_dialect.CheckpointNumber, checkpointToken); return query.ExecutePagedQuery(statement, (q, r) => { }) - .Select(x => x.GetCommit(_serializer, _dialect)); + .Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect)); }); } @@ -282,7 +286,7 @@ public IEnumerable GetFromTo(Int64 from, Int64 to) query.AddParameter(_dialect.FromCheckpointNumber, from); query.AddParameter(_dialect.ToCheckpointNumber, to); return query.ExecutePagedQuery(statement, (q, r) => { }) - .Select(x => x.GetCommit(_serializer, _dialect)); + .Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect)); }); } diff --git a/src/NEventStore.Persistence.Sql/SqlPersistenceFactory.cs b/src/NEventStore.Persistence.Sql/SqlPersistenceFactory.cs index de977aa..8288afd 100644 --- a/src/NEventStore.Persistence.Sql/SqlPersistenceFactory.cs +++ b/src/NEventStore.Persistence.Sql/SqlPersistenceFactory.cs @@ -15,9 +15,10 @@ public class SqlPersistenceFactory : IPersistenceFactory public SqlPersistenceFactory( string connectionName, ISerialize serializer, + ISerializeEvents eventSerializer, ISqlDialect dialect = null, TransactionScopeOption? scopeOption = null) - : this(serializer, scopeOption, null, DefaultPageSize) + : this(serializer, eventSerializer, scopeOption, null, DefaultPageSize) { ConnectionFactory = new ConfigurationConnectionFactory(connectionName); Dialect = dialect ?? ResolveDialect(new ConfigurationConnectionFactory(connectionName).Settings); @@ -27,19 +28,22 @@ public SqlPersistenceFactory( public SqlPersistenceFactory( IConnectionFactory factory, ISerialize serializer, + ISerializeEvents eventSerializer, ISqlDialect dialect, IStreamIdHasher streamIdHasher = null, TransactionScopeOption? scopeOption = null, int pageSize = DefaultPageSize) - : this(serializer, scopeOption, streamIdHasher, pageSize) + : this(serializer, eventSerializer, scopeOption, streamIdHasher, pageSize) { ConnectionFactory = factory; Dialect = dialect ?? throw new ArgumentNullException(nameof(dialect)); } - private SqlPersistenceFactory(ISerialize serializer, TransactionScopeOption? scopeOption, IStreamIdHasher streamIdHasher, int pageSize) + private SqlPersistenceFactory(ISerialize serializer, ISerializeEvents eventSerializer, + TransactionScopeOption? scopeOption, IStreamIdHasher streamIdHasher, int pageSize) { Serializer = serializer; + EventSerializer = eventSerializer; _scopeOption = scopeOption; StreamIdHasher = streamIdHasher ?? new Sha1StreamIdHasher(); PageSize = pageSize; @@ -51,13 +55,15 @@ private SqlPersistenceFactory(ISerialize serializer, TransactionScopeOption? sco protected virtual ISerialize Serializer { get; } + protected virtual ISerializeEvents EventSerializer { get; } + protected virtual IStreamIdHasher StreamIdHasher { get; } protected int PageSize { get; set; } public virtual IPersistStreams Build() { - return new SqlPersistenceEngine(ConnectionFactory, Dialect, Serializer, PageSize, StreamIdHasher, _scopeOption); + return new SqlPersistenceEngine(ConnectionFactory, Dialect, Serializer, EventSerializer, PageSize, StreamIdHasher, _scopeOption); } #if NET462 diff --git a/src/NEventStore.Persistence.Sql/SqlPersistenceWireup.cs b/src/NEventStore.Persistence.Sql/SqlPersistenceWireup.cs index 31e8570..df1e276 100644 --- a/src/NEventStore.Persistence.Sql/SqlPersistenceWireup.cs +++ b/src/NEventStore.Persistence.Sql/SqlPersistenceWireup.cs @@ -35,6 +35,7 @@ public SqlPersistenceWireup(Wireup wireup, IConnectionFactory connectionFactory) return new SqlPersistenceFactory( connectionFactory, c.Resolve(), + c.Resolve(), c.Resolve(), c.Resolve(), scopeOptions, diff --git a/src/NEventStore.Persistence.Sqlite.Tests/PersistenceEngineFixture.cs b/src/NEventStore.Persistence.Sqlite.Tests/PersistenceEngineFixture.cs index d954f14..733b2f9 100644 --- a/src/NEventStore.Persistence.Sqlite.Tests/PersistenceEngineFixture.cs +++ b/src/NEventStore.Persistence.Sqlite.Tests/PersistenceEngineFixture.cs @@ -10,21 +10,31 @@ public partial class PersistenceEngineFixture public PersistenceEngineFixture() { _createPersistence = pageSize => - new SqlPersistenceFactory( - new ConfigurationConnectionFactory("NEventStore.Persistence.AcceptanceTests.Properties.Settings.SQLite"), - new BinarySerializer(), + { + var serializer = new BinarySerializer(); + return new SqlPersistenceFactory( + new ConfigurationConnectionFactory( + "NEventStore.Persistence.AcceptanceTests.Properties.Settings.SQLite"), + serializer, + new DefaultEventSerializer(serializer), new SqliteDialect(), pageSize: pageSize).Build(); + }; } -#else +#else public PersistenceEngineFixture() { _createPersistence = pageSize => - new SqlPersistenceFactory( - new NetStandardConnectionFactory(System.Data.SQLite.SQLiteFactory.Instance, "Data Source=NEventStore.db;"), - new BinarySerializer(), + { + var serializer = new BinarySerializer(); + return new SqlPersistenceFactory( + new NetStandardConnectionFactory(System.Data.SQLite.SQLiteFactory.Instance, + "Data Source=NEventStore.db;"), + serializer, + new DefaultEventSerializer(serializer), new SqliteDialect(), pageSize: pageSize).Build(); + }; /* * There are issues with Microsoft.Data.Sqlite > 2.2.6