From e3abe092db6298971d40909fe4f64fe11972e1ad Mon Sep 17 00:00:00 2001 From: Jarle Hjortland Date: Thu, 9 Jun 2022 15:50:05 +0200 Subject: [PATCH] Added DbBatch support --- .../PostgreSqlDbBatchJournalPerfSpec.cs | 49 +++++++ .../PostgreSqlDbBatchJournalSpec.cs | 94 +++++++++++++ .../Akka.Persistence.PostgreSql.csproj | 2 +- .../Journal/PostgreSqlDbBatchJournal.cs | 67 ++++++++++ .../Journal/PostgreSqlDbBatchQueryExecutor.cs | 124 ++++++++++++++++++ .../Journal/PostgreSqlQueryExecutor.cs | 13 +- src/common.props | 3 +- 7 files changed, 344 insertions(+), 8 deletions(-) create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Performance/PostgreSqlDbBatchJournalPerfSpec.cs create mode 100644 src/Akka.Persistence.PostgreSql.Tests/PostgreSqlDbBatchJournalSpec.cs create mode 100644 src/Akka.Persistence.PostgreSql/Journal/PostgreSqlDbBatchJournal.cs create mode 100644 src/Akka.Persistence.PostgreSql/Journal/PostgreSqlDbBatchQueryExecutor.cs diff --git a/src/Akka.Persistence.PostgreSql.Tests/Performance/PostgreSqlDbBatchJournalPerfSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Performance/PostgreSqlDbBatchJournalPerfSpec.cs new file mode 100644 index 0000000..1f751d0 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Performance/PostgreSqlDbBatchJournalPerfSpec.cs @@ -0,0 +1,49 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Configuration; +using Akka.Persistence.TestKit.Performance; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.PostgreSql.Tests.Performance +{ + [Collection("PostgreSqlSpec")] + public class PostgreSqlDbBatchJournalPerfSpec : JournalPerfSpec + { + public PostgreSqlDbBatchJournalPerfSpec(ITestOutputHelper output, PostgresFixture fixture) + : base(CreateSpecConfig(fixture), "PostgreSqlJournalPerfSpec", output) + { + EventsCount = 5 * 1000; + ExpectDuration = TimeSpan.FromSeconds(60); + } + + private static Config CreateSpecConfig(PostgresFixture fixture) + { + //need to make sure db is created before the tests start + DbUtils.Initialize(fixture); + + return ConfigurationFactory.ParseString(@" + akka.persistence.journal.plugin = ""akka.persistence.journal.postgresql"" + akka.persistence.journal.postgresql { + class = ""Akka.Persistence.PostgreSql.Journal.PostgreDbBatchSqlJournal, Akka.Persistence.PostgreSql"" + auto-initialize = on + connection-string = """ + DbUtils.ConnectionString + @""" + } + akka.test.single-expect-default = 10s") + .WithFallback(PostgreSqlPersistence.DefaultConfiguration()) + .WithFallback(Persistence.DefaultConfig()); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + DbUtils.Clean(); + } + } +} diff --git a/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlDbBatchJournalSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlDbBatchJournalSpec.cs new file mode 100644 index 0000000..6c3f366 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlDbBatchJournalSpec.cs @@ -0,0 +1,94 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Reflection; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.PostgreSql.Journal; +using Akka.Persistence.TCK.Journal; +using FluentAssertions; +using Npgsql; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.PostgreSql.Tests +{ + [Collection("PostgreSqlSpec")] + public class PostgreSqlDbBatchJournalSpec : JournalSpec + { + private static Config Initialize(PostgresFixture fixture) + { + //need to make sure db is created before the tests start + DbUtils.Initialize(fixture); + + var config = @" + akka.persistence { + publish-plugin-commands = on + journal { + plugin = ""akka.persistence.journal.postgresql"" + postgresql { + class = ""Akka.Persistence.PostgreSql.Journal.PostgreDbBatchSqlJournal, Akka.Persistence.PostgreSql"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + table-name = event_journal + schema-name = public + auto-initialize = on + connection-string = """ + DbUtils.ConnectionString + @""" + } + } + } + akka.test.single-expect-default = 10s"; + + return ConfigurationFactory.ParseString(config); + } + + // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 + protected override bool SupportsSerialization => false; + + public PostgreSqlDbBatchJournalSpec(ITestOutputHelper output, PostgresFixture fixture) + : base(Initialize(fixture), "PostgreSqlJournalSpec", output: output) + { + Initialize(); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + DbUtils.Clean(); + } + + [Fact] + public async Task BigSerial_Journal_ordering_column_data_type_should_be_BigSerial() + { + using (var conn = new NpgsqlConnection(DbUtils.ConnectionString)) + { + conn.Open(); + + var sql = $@" + SELECT column_name, column_default, data_type, is_identity, identity_generation + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'event_journal' + AND ordinal_position = 1"; + + using (var cmd = new NpgsqlCommand(sql, conn)) + { + var reader = await cmd.ExecuteReaderAsync(); + await reader.ReadAsync(); + + // these are the "fingerprint" of BIGSERIAL + reader.GetString(0).Should().Be("ordering"); + reader.GetString(1).Should().Be("nextval('event_journal_ordering_seq'::regclass)"); + reader.GetString(2).Should().Be("bigint"); + reader.GetString(3).Should().Be("NO"); + reader[4].Should().BeOfType(); + } + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj b/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj index ad093d7..0d4515b 100644 --- a/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj +++ b/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj @@ -4,7 +4,7 @@ Akka.Persistence.PostgreSql Akka Persistence journal and snapshot store backed by PostgreSql database. - $(NetStandardLibVersion) + $(NetStandardLibVersion);$(NetVersion) true diff --git a/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlDbBatchJournal.cs b/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlDbBatchJournal.cs new file mode 100644 index 0000000..b1c67e6 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlDbBatchJournal.cs @@ -0,0 +1,67 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Configuration; +using Akka.Persistence.Sql.Common.Journal; +using Npgsql; +using System; +using System.Data.Common; +using Akka.Event; + +namespace Akka.Persistence.PostgreSql.Journal +{ + /// + /// Persistent journal actor using PostgreSQL as persistence layer. It processes write requests + /// one by one in synchronous manner, while reading results asynchronously. + /// + + public class PostgreDbBatchSqlJournal : PostgreSqlJournal + { + public readonly PostgreSqlPersistence Extension = PostgreSqlPersistence.Get(Context.System); + public PostgreSqlJournalSettings JournalSettings { get; } + public PostgreDbBatchSqlJournal(Config journalConfig) : base(journalConfig) + { + var config = journalConfig.WithFallback(Extension.DefaultJournalConfig); + StoredAsType storedAs; + var storedAsString = config.GetString("stored-as"); + if (!Enum.TryParse(storedAsString, true, out storedAs)) + { + throw new ConfigurationException($"Value [{storedAsString}] of the 'stored-as' HOCON config key is not valid. Valid values: bytea, json, jsonb."); + } + + QueryExecutor = new PostgreSqlDbBatchQueryExecutor(new PostgreSqlQueryConfiguration( + schemaName: config.GetString("schema-name"), + journalEventsTableName: config.GetString("table-name"), + metaTableName: config.GetString("metadata-table-name"), + persistenceIdColumnName: "persistence_id", + sequenceNrColumnName: "sequence_nr", + payloadColumnName: "payload", + manifestColumnName: "manifest", + timestampColumnName: "created_at", + isDeletedColumnName: "is_deleted", + tagsColumnName: "tags", + orderingColumn: "ordering", + serializerIdColumnName: "serializer_id", + timeout: config.GetTimeSpan("connection-timeout"), + storedAs: storedAs, + defaultSerializer: config.GetString("serializer"), + useSequentialAccess: config.GetBoolean("sequential-access"), + useBigIntPrimaryKey: config.GetBoolean("use-bigint-identity-for-ordering-column")), + Context.System.Serialization, + GetTimestampProvider(config.GetString("timestamp-provider")), Context.GetLogger()); + + JournalSettings = new PostgreSqlJournalSettings(config); + } + + public override IJournalQueryExecutor QueryExecutor { get; } + protected override string JournalConfigPath => PostgreSqlJournalSettings.JournalConfigPath; + protected override DbConnection CreateDbConnection(string connectionString) + { + return new NpgsqlConnection(connectionString); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlDbBatchQueryExecutor.cs b/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlDbBatchQueryExecutor.cs new file mode 100644 index 0000000..98362df --- /dev/null +++ b/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlDbBatchQueryExecutor.cs @@ -0,0 +1,124 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Actor; +using Akka.Persistence.Sql.Common.Journal; +using Akka.Serialization; +using Akka.Util; +using Newtonsoft.Json; +using Npgsql; +using NpgsqlTypes; +using System; +using System.Collections.Immutable; +using System.Data; +using System.Data.Common; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Akka.Event; + +namespace Akka.Persistence.PostgreSql.Journal +{ + public class PostgreSqlDbBatchQueryExecutor : PostgreSqlQueryExecutor + { + protected readonly ILoggingAdapter Log; + public PostgreSqlDbBatchQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.Serialization.Serialization serialization, ITimestampProvider timestampProvider, ILoggingAdapter loggingAdapter) + : base(configuration, serialization, timestampProvider) + { + Log = loggingAdapter; + } + + protected void AddParameter(NpgsqlBatchCommand command, string parameterName, DbType parameterType, object value) + { + var parameter = new NpgsqlParameter() + { + ParameterName = parameterName, + DbType = parameterType, + Value = value + }; + + command.Parameters.Add(parameter); + } + protected void WriteEvent(NpgsqlBatchCommand command, IPersistentRepresentation e, IImmutableSet tags) + { + var serializationResult = _serialize(e); + var serializer = serializationResult.Serializer; + var hasSerializer = serializer != null; + + string manifest = ""; + if (hasSerializer && serializer is SerializerWithStringManifest) + manifest = ((SerializerWithStringManifest)serializer).Manifest(e.Payload); + else if (hasSerializer && serializer.IncludeManifest) + manifest = QualifiedName(e); + else + manifest = string.IsNullOrEmpty(e.Manifest) ? QualifiedName(e) : e.Manifest; + + AddParameter(command, "@PersistenceId", DbType.String, e.PersistenceId); + AddParameter(command, "@SequenceNr", DbType.Int64, e.SequenceNr); + AddParameter(command, "@Timestamp", DbType.Int64, TimestampProvider.GenerateTimestamp(e)); + AddParameter(command, "@IsDeleted", DbType.Boolean, false); + AddParameter(command, "@Manifest", DbType.String, manifest); + + if (hasSerializer) + { + AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier); + } + else + { + AddParameter(command, "@SerializerId", DbType.Int32, DBNull.Value); + } + + command.Parameters.Add(new NpgsqlParameter("@Payload", serializationResult.DbType) { Value = serializationResult.Payload }); + + if (tags.Count != 0) + { + var tagBuilder = new StringBuilder(";", tags.Sum(x => x.Length) + tags.Count + 1); + foreach (var tag in tags) + { + tagBuilder.Append(tag).Append(';'); + } + + AddParameter(command, "@Tag", DbType.String, tagBuilder.ToString()); + } + else AddParameter(command, "@Tag", DbType.String, DBNull.Value); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static string QualifiedName(IPersistentRepresentation e) + => e.Payload.GetType().TypeQualifiedName(); + + + public override async Task InsertBatchAsync(DbConnection connection, CancellationToken cancellationToken, WriteJournalBatch write) + { + using (var tx = ((NpgsqlConnection)connection).BeginTransaction()) + { + using (var batch = new NpgsqlBatch(((NpgsqlConnection)connection))) + { + + batch.Transaction = tx; + + foreach (var entry in write.EntryTags) + { + NpgsqlBatchCommand command = (NpgsqlBatchCommand)batch.CreateBatchCommand(); + command.CommandText = this.InsertEventSql; + + var evt = entry.Key; + var tags = entry.Value; + + WriteEvent(command, evt, tags); + batch.BatchCommands.Add(command); + } + await batch.PrepareAsync(cancellationToken); + await batch.ExecuteNonQueryAsync(cancellationToken); + await tx.CommitAsync(cancellationToken); + } + } + } + } +} diff --git a/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs b/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs index 4c43308..00324d8 100644 --- a/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs +++ b/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs @@ -26,15 +26,15 @@ namespace Akka.Persistence.PostgreSql.Journal { public class PostgreSqlQueryExecutor : AbstractQueryExecutor { - private readonly Func _serialize; - private readonly Func _deserialize; + internal readonly Func _serialize; + internal readonly Func _deserialize; public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.Serialization.Serialization serialization, ITimestampProvider timestampProvider) : base(configuration, serialization, timestampProvider) { var storedAs = configuration.StoredAs.ToString().ToUpperInvariant(); - - CreateEventsJournalSql = $@" + + CreateEventsJournalSql = $@" CREATE TABLE IF NOT EXISTS {Configuration.FullJournalTableName} ( {Configuration.OrderingColumnName} {(configuration.UseBigIntPrimaryKey ? "BIGINT GENERATED ALWAYS AS IDENTITY" : "BIGSERIAL")} NOT NULL PRIMARY KEY, {Configuration.PersistenceIdColumnName} VARCHAR(255) NOT NULL, @@ -46,8 +46,9 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka. {Configuration.TagsColumnName} VARCHAR(100) NULL, {Configuration.SerializerIdColumnName} INTEGER NULL, CONSTRAINT {Configuration.JournalEventsTableName}_uq UNIQUE ({Configuration.PersistenceIdColumnName}, {Configuration.SequenceNrColumnName}) - );"; - + ); CREATE INDEX IF NOT EXISTS IX_{Configuration.JournalEventsTableName}_{Configuration.SequenceNrColumnName} ON {Configuration.FullJournalTableName} USING btree ( + {Configuration.SequenceNrColumnName} ASC NULLS LAST) INCLUDE({Configuration.PersistenceIdColumnName}) + ;"; CreateMetaTableSql = $@" CREATE TABLE IF NOT EXISTS {Configuration.FullMetaTableName} ( {Configuration.PersistenceIdColumnName} VARCHAR(255) NOT NULL, diff --git a/src/common.props b/src/common.props index c2cd135..152360a 100644 --- a/src/common.props +++ b/src/common.props @@ -2,7 +2,7 @@ Copyright © 2013-2017 Akka.NET Team Akka.NET Team - 1.3.9 + 1.4.35 http://getakka.net/images/akkalogo.png https://github.com/akkadotnet/Akka.Persistence.PostgreSql https://github.com/akkadotnet/Akka.Persistence.PostgreSql/blob/dev/LICENSE.md @@ -19,6 +19,7 @@ 6.0.7 [$(PostgresLowVersion), $(PostgresHighVersion)] netstandard2.0 + .net6.0 net45 17.3.2 netcoreapp3.1