Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dedicated interface for event deserialization #46

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[submodule "dependencies/NEventStore"]
path = dependencies/NEventStore
url = https://github.com/NEventStore/NEventStore.git
url = https://github.com/SignPath/NEventStore.git
branch = feature/AddDedicatedEventDeserialization
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using NEventStore.Persistence.Sql.Tests;
using Microsoft.Data.SqlClient;
using NEventStore.Persistence.Sql.Tests;

namespace NEventStore.Persistence.AcceptanceTests
{
Expand All @@ -13,17 +14,23 @@ public partial class PersistenceEngineFixture
/// this mimic the current NEventStore default values which is run outside any transaction (creates a scope that
/// suppresses any transaction)
/// </summary>
public TransactionScopeOption? ScopeOption { get; set; } = null; // the old default: TransactionScopeOption.Suppress;
public TransactionScopeOption? ScopeOption { get; set; } =
null; // the old default: TransactionScopeOption.Suppress;

public PersistenceEngineFixture()
{
_createPersistence = pageSize =>
new SqlPersistenceFactory(new EnviromentConnectionFactory("MsSql", Microsoft.Data.SqlClient.SqlClientFactory.Instance),
new BinarySerializer(),
{
var serializer = new BinarySerializer();
return new SqlPersistenceFactory(
new EnviromentConnectionFactory("MsSql", SqlClientFactory.Instance),
serializer,
new DefaultEventSerializer(serializer),
new MsSqlDialect(),
pageSize: pageSize,
scopeOption: ScopeOption
).Build();
).Build();
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,13 @@ public IsolationLevelPersistenceEngineFixture()
_recorder = new IsolationLevelRecorder();
_connectionFactory = new EnviromentConnectionFactory("MsSql", Microsoft.Data.SqlClient.SqlClientFactory.Instance);
_createPersistence = () =>
new SqlPersistenceFactory(_connectionFactory,
new BinarySerializer(),
{
var serializer = new BinarySerializer();
return new SqlPersistenceFactory(_connectionFactory,
serializer,
new DefaultEventSerializer(serializer),
new IsolationLevelRecordingSqlDialect(_recorder)).Build();
};
}

public void Initialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,28 @@ public PersistenceEngineFixture()
{
#if NET462
_createPersistence = pageSize =>
new SqlPersistenceFactory(
new EnviromentConnectionFactory("MySql", "MySql.Data.MySqlClient"),
new BinarySerializer(),
new MySqlDialect(),
pageSize: pageSize).Build();
{
var serializer = new BinarySerializer();
return new SqlPersistenceFactory(
new EnviromentConnectionFactory("MySql", "MySql.Data.MySqlClient"),
serializer,
new DefaultEventSerializer(serializer),
new MySqlDialect(),
pageSize: pageSize).Build();
};

// Wireup.Init().UsingSqlPersistence("test");
#else
_createPersistence = pageSize =>
new SqlPersistenceFactory(
{
var serializer = new BinarySerializer();
return new SqlPersistenceFactory(
new EnviromentConnectionFactory("MySql", MySqlClientFactory.Instance),
new BinarySerializer(),
serializer,
new DefaultEventSerializer(serializer),
new MySqlDialect(),
pageSize: pageSize).Build();
};

// Wireup.Init().UsingSqlPersistence("test");
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,26 @@ public PersistenceEngineFixture()

#if NET462
_createPersistence = pageSize =>
new SqlPersistenceFactory(
{
var serializer = new BinarySerializer();
return new SqlPersistenceFactory(
new EnviromentConnectionFactory("PostgreSql", "Npgsql"),
new BinarySerializer(),
serializer,
new DefaultEventSerializer(serializer),
new PostgreNpgsql6Dialect(npgsql6timestamp: true),
pageSize: pageSize).Build();
};
#else
_createPersistence = pageSize =>
new SqlPersistenceFactory(
{
var serializer = new BinarySerializer();
return new SqlPersistenceFactory(
new EnviromentConnectionFactory("PostgreSql", Npgsql.NpgsqlFactory.Instance),
new BinarySerializer(),
serializer,
new DefaultEventSerializer(serializer),
new PostgreNpgsql6Dialect(npgsql6timestamp: true),
pageSize: pageSize).Build();
};
#endif
}
}
Expand Down
29 changes: 20 additions & 9 deletions src/NEventStore.Persistence.Sql/CommitExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,30 @@ public static class CommitExtensions
private const int PayloadIndex = 9;
private static readonly ILogger Logger = LogFactory.BuildLogger(typeof (CommitExtensions));

public static ICommit GetCommit(this IDataRecord record, ISerialize serializer, ISqlDialect sqlDialect)
public static ICommit GetCommit(this IDataRecord record, ISerialize serializer, ISerializeEvents eventSerializer, ISqlDialect sqlDialect)
{
Logger.LogTrace(Messages.DeserializingCommit, serializer.GetType());

var bucketId = record[BucketIdIndex].ToString();
var streamId = record[StreamIdOriginalIndex].ToString();
var streamRevision = record[StreamRevisionIndex].ToInt();
var commitId = record[CommitIdIndex].ToGuid();
var commitSequence = record[CommitSequenceIndex].ToInt();
var commitStamp = sqlDialect.ToDateTime(record[CommitStampIndex]);
var checkpointToken = record.CheckpointNumber();

var headers = serializer.Deserialize<Dictionary<string, object>>(record, HeadersIndex);
var events = serializer.Deserialize<List<EventMessage>>(record, PayloadIndex);

return new Commit(record[BucketIdIndex].ToString(),
record[StreamIdOriginalIndex].ToString(),
record[StreamRevisionIndex].ToInt(),
record[CommitIdIndex].ToGuid(),
record[CommitSequenceIndex].ToInt(),
sqlDialect.ToDateTime(record[CommitStampIndex]),
record[CheckpointIndex].ToLong(),
var events = eventSerializer.DeserializeEventMessages((byte[])record[PayloadIndex], bucketId, streamId,
streamRevision, commitId, commitSequence, commitStamp, checkpointToken);
chrischu marked this conversation as resolved.
Show resolved Hide resolved

return new Commit(bucketId,
streamId,
streamRevision,
commitId,
commitSequence,
commitStamp,
checkpointToken,
headers,
events);
}
Expand Down
20 changes: 12 additions & 8 deletions src/NEventStore.Persistence.Sql/SqlPersistenceEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class SqlPersistenceEngine : IPersistStreams
private readonly int _pageSize;
private readonly TransactionScopeOption? _scopeOption;
private readonly ISerialize _serializer;
private readonly ISerializeEvents _eventSerializer;
private bool _disposed;
private int _initialized;
private readonly IStreamIdHasher _streamIdHasher;
Expand All @@ -27,15 +28,17 @@ public SqlPersistenceEngine(
IConnectionFactory connectionFactory,
ISqlDialect dialect,
ISerialize serializer,
ISerializeEvents eventSerializer,
int pageSize,
TransactionScopeOption? scopeOption = null)
: this(connectionFactory, dialect, serializer, pageSize, new Sha1StreamIdHasher(), scopeOption)
: this(connectionFactory, dialect, serializer, eventSerializer, pageSize, new Sha1StreamIdHasher(), scopeOption)
{ }

public SqlPersistenceEngine(
IConnectionFactory connectionFactory,
ISqlDialect dialect,
ISerialize serializer,
ISerializeEvents eventSerializer,
int pageSize,
IStreamIdHasher streamIdHasher,
TransactionScopeOption? scopeOption = null)
Expand All @@ -53,6 +56,7 @@ public SqlPersistenceEngine(
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_dialect = dialect ?? throw new ArgumentNullException(nameof(dialect));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
_eventSerializer = eventSerializer ?? throw new ArgumentNullException(nameof(eventSerializer));
_scopeOption = scopeOption;
_pageSize = pageSize;
_streamIdHasher = new StreamIdHasherValidator(streamIdHasher);
Expand Down Expand Up @@ -91,7 +95,7 @@ public virtual IEnumerable<ICommit> GetFrom(string bucketId, string streamId, in
query.AddParameter(_dialect.CommitSequence, 0);
return query
.ExecutePagedQuery(statement, _dialect.NextPageDelegate)
.Select(x => x.GetCommit(_serializer, _dialect));
.Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect));
});
}

Expand All @@ -107,7 +111,7 @@ public virtual IEnumerable<ICommit> GetFrom(string bucketId, DateTime start)
query.AddParameter(_dialect.BucketId, bucketId, DbType.AnsiString);
query.AddParameter(_dialect.CommitStamp, start, _dialect.GetDateTimeDbType());
return query.ExecutePagedQuery(statement, (q, r) => { })
.Select(x => x.GetCommit(_serializer, _dialect));
.Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect));
});
}

Expand All @@ -125,7 +129,7 @@ public virtual IEnumerable<ICommit> GetFromTo(string bucketId, DateTime start, D
query.AddParameter(_dialect.CommitStampStart, start, _dialect.GetDateTimeDbType());
query.AddParameter(_dialect.CommitStampEnd, end, _dialect.GetDateTimeDbType());
return query.ExecutePagedQuery(statement, (q, r) => { })
.Select(x => x.GetCommit(_serializer, _dialect));
.Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect));
});
}

Expand Down Expand Up @@ -243,7 +247,7 @@ public IEnumerable<ICommit> GetFrom(string bucketId, Int64 checkpointToken)
query.AddParameter(_dialect.BucketId, bucketId, DbType.AnsiString);
query.AddParameter(_dialect.CheckpointNumber, checkpointToken);
return query.ExecutePagedQuery(statement, (q, r) => { })
.Select(x => x.GetCommit(_serializer, _dialect));
.Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect));
});
}

Expand All @@ -257,7 +261,7 @@ public IEnumerable<ICommit> GetFromTo(String bucketId, Int64 from, Int64 to)
query.AddParameter(_dialect.FromCheckpointNumber, from);
query.AddParameter(_dialect.ToCheckpointNumber, to);
return query.ExecutePagedQuery(statement, (q, r) => { })
.Select(x => x.GetCommit(_serializer, _dialect));
.Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect));
});
}

Expand All @@ -269,7 +273,7 @@ public IEnumerable<ICommit> GetFrom(Int64 checkpointToken)
string statement = _dialect.GetCommitsFromCheckpoint;
query.AddParameter(_dialect.CheckpointNumber, checkpointToken);
return query.ExecutePagedQuery(statement, (q, r) => { })
.Select(x => x.GetCommit(_serializer, _dialect));
.Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect));
});
}

Expand All @@ -282,7 +286,7 @@ public IEnumerable<ICommit> GetFromTo(Int64 from, Int64 to)
query.AddParameter(_dialect.FromCheckpointNumber, from);
query.AddParameter(_dialect.ToCheckpointNumber, to);
return query.ExecutePagedQuery(statement, (q, r) => { })
.Select(x => x.GetCommit(_serializer, _dialect));
.Select(x => x.GetCommit(_serializer, _eventSerializer, _dialect));
});
}

Expand Down
14 changes: 10 additions & 4 deletions src/NEventStore.Persistence.Sql/SqlPersistenceFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ public class SqlPersistenceFactory : IPersistenceFactory
public SqlPersistenceFactory(
string connectionName,
ISerialize serializer,
ISerializeEvents eventSerializer,
ISqlDialect dialect = null,
TransactionScopeOption? scopeOption = null)
: this(serializer, scopeOption, null, DefaultPageSize)
: this(serializer, eventSerializer, scopeOption, null, DefaultPageSize)
{
ConnectionFactory = new ConfigurationConnectionFactory(connectionName);
Dialect = dialect ?? ResolveDialect(new ConfigurationConnectionFactory(connectionName).Settings);
Expand All @@ -27,19 +28,22 @@ public SqlPersistenceFactory(
public SqlPersistenceFactory(
IConnectionFactory factory,
ISerialize serializer,
ISerializeEvents eventSerializer,
ISqlDialect dialect,
IStreamIdHasher streamIdHasher = null,
TransactionScopeOption? scopeOption = null,
int pageSize = DefaultPageSize)
: this(serializer, scopeOption, streamIdHasher, pageSize)
: this(serializer, eventSerializer, scopeOption, streamIdHasher, pageSize)
{
ConnectionFactory = factory;
Dialect = dialect ?? throw new ArgumentNullException(nameof(dialect));
}

private SqlPersistenceFactory(ISerialize serializer, TransactionScopeOption? scopeOption, IStreamIdHasher streamIdHasher, int pageSize)
private SqlPersistenceFactory(ISerialize serializer, ISerializeEvents eventSerializer,
TransactionScopeOption? scopeOption, IStreamIdHasher streamIdHasher, int pageSize)
{
Serializer = serializer;
EventSerializer = eventSerializer;
_scopeOption = scopeOption;
StreamIdHasher = streamIdHasher ?? new Sha1StreamIdHasher();
PageSize = pageSize;
Expand All @@ -51,13 +55,15 @@ private SqlPersistenceFactory(ISerialize serializer, TransactionScopeOption? sco

protected virtual ISerialize Serializer { get; }

protected virtual ISerializeEvents EventSerializer { get; }

protected virtual IStreamIdHasher StreamIdHasher { get; }

protected int PageSize { get; set; }

public virtual IPersistStreams Build()
{
return new SqlPersistenceEngine(ConnectionFactory, Dialect, Serializer, PageSize, StreamIdHasher, _scopeOption);
return new SqlPersistenceEngine(ConnectionFactory, Dialect, Serializer, EventSerializer, PageSize, StreamIdHasher, _scopeOption);
}

#if NET462
Expand Down
1 change: 1 addition & 0 deletions src/NEventStore.Persistence.Sql/SqlPersistenceWireup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public SqlPersistenceWireup(Wireup wireup, IConnectionFactory connectionFactory)
return new SqlPersistenceFactory(
connectionFactory,
c.Resolve<ISerialize>(),
c.Resolve<ISerializeEvents>(),
c.Resolve<ISqlDialect>(),
c.Resolve<IStreamIdHasher>(),
scopeOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,31 @@ public partial class PersistenceEngineFixture
public PersistenceEngineFixture()
{
_createPersistence = pageSize =>
new SqlPersistenceFactory(
new ConfigurationConnectionFactory("NEventStore.Persistence.AcceptanceTests.Properties.Settings.SQLite"),
new BinarySerializer(),
{
var serializer = new BinarySerializer();
return new SqlPersistenceFactory(
new ConfigurationConnectionFactory(
"NEventStore.Persistence.AcceptanceTests.Properties.Settings.SQLite"),
serializer,
new DefaultEventSerializer(serializer),
new SqliteDialect(),
pageSize: pageSize).Build();
};
}
#else
#else
public PersistenceEngineFixture()
{
_createPersistence = pageSize =>
new SqlPersistenceFactory(
new NetStandardConnectionFactory(System.Data.SQLite.SQLiteFactory.Instance, "Data Source=NEventStore.db;"),
new BinarySerializer(),
{
var serializer = new BinarySerializer();
return new SqlPersistenceFactory(
new NetStandardConnectionFactory(System.Data.SQLite.SQLiteFactory.Instance,
"Data Source=NEventStore.db;"),
serializer,
new DefaultEventSerializer(serializer),
new SqliteDialect(),
pageSize: pageSize).Build();
};

/*
* There are issues with Microsoft.Data.Sqlite > 2.2.6
Expand Down