Skip to content

Commit

Permalink
Added DbBatch support
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarle Hjortland authored and jarlrasm committed Oct 26, 2022
1 parent 5fe4a21 commit e3abe09
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//-----------------------------------------------------------------------
// <copyright file="PostgreSqlJournalPerfSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using Akka.Configuration;
using Akka.Persistence.TestKit.Performance;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.PostgreSql.Tests.Performance
{
[Collection("PostgreSqlSpec")]
public class PostgreSqlDbBatchJournalPerfSpec : JournalPerfSpec
{
public PostgreSqlDbBatchJournalPerfSpec(ITestOutputHelper output, PostgresFixture fixture)
: base(CreateSpecConfig(fixture), "PostgreSqlJournalPerfSpec", output)
{
EventsCount = 5 * 1000;
ExpectDuration = TimeSpan.FromSeconds(60);
}

private static Config CreateSpecConfig(PostgresFixture fixture)
{
//need to make sure db is created before the tests start
DbUtils.Initialize(fixture);

return ConfigurationFactory.ParseString(@"
akka.persistence.journal.plugin = ""akka.persistence.journal.postgresql""
akka.persistence.journal.postgresql {
class = ""Akka.Persistence.PostgreSql.Journal.PostgreDbBatchSqlJournal, Akka.Persistence.PostgreSql""
auto-initialize = on
connection-string = """ + DbUtils.ConnectionString + @"""
}
akka.test.single-expect-default = 10s")
.WithFallback(PostgreSqlPersistence.DefaultConfiguration())
.WithFallback(Persistence.DefaultConfig());
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
DbUtils.Clean();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//-----------------------------------------------------------------------
// <copyright file="PostgreSqlJournalSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Reflection;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.PostgreSql.Journal;
using Akka.Persistence.TCK.Journal;
using FluentAssertions;
using Npgsql;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.PostgreSql.Tests
{
[Collection("PostgreSqlSpec")]
public class PostgreSqlDbBatchJournalSpec : JournalSpec
{
private static Config Initialize(PostgresFixture fixture)
{
//need to make sure db is created before the tests start
DbUtils.Initialize(fixture);

var config = @"
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.postgresql""
postgresql {
class = ""Akka.Persistence.PostgreSql.Journal.PostgreDbBatchSqlJournal, Akka.Persistence.PostgreSql""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
schema-name = public
auto-initialize = on
connection-string = """ + DbUtils.ConnectionString + @"""
}
}
}
akka.test.single-expect-default = 10s";

return ConfigurationFactory.ParseString(config);
}

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
protected override bool SupportsSerialization => false;

public PostgreSqlDbBatchJournalSpec(ITestOutputHelper output, PostgresFixture fixture)
: base(Initialize(fixture), "PostgreSqlJournalSpec", output: output)
{
Initialize();
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
DbUtils.Clean();
}

[Fact]
public async Task BigSerial_Journal_ordering_column_data_type_should_be_BigSerial()
{
using (var conn = new NpgsqlConnection(DbUtils.ConnectionString))
{
conn.Open();

var sql = $@"
SELECT column_name, column_default, data_type, is_identity, identity_generation
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'event_journal'
AND ordinal_position = 1";

using (var cmd = new NpgsqlCommand(sql, conn))
{
var reader = await cmd.ExecuteReaderAsync();
await reader.ReadAsync();

// these are the "fingerprint" of BIGSERIAL
reader.GetString(0).Should().Be("ordering");
reader.GetString(1).Should().Be("nextval('event_journal_ordering_seq'::regclass)");
reader.GetString(2).Should().Be("bigint");
reader.GetString(3).Should().Be("NO");
reader[4].Should().BeOfType<DBNull>();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<PropertyGroup>
<AssemblyTitle>Akka.Persistence.PostgreSql</AssemblyTitle>
<Description>Akka Persistence journal and snapshot store backed by PostgreSql database.</Description>
<TargetFrameworks>$(NetStandardLibVersion)</TargetFrameworks>
<TargetFrameworks>$(NetStandardLibVersion);$(NetVersion)</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//-----------------------------------------------------------------------
// <copyright file="PostgreSqlJournal.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Persistence.Sql.Common.Journal;
using Npgsql;
using System;
using System.Data.Common;
using Akka.Event;

namespace Akka.Persistence.PostgreSql.Journal
{
/// <summary>
/// Persistent journal actor using PostgreSQL as persistence layer. It processes write requests
/// one by one in synchronous manner, while reading results asynchronously.
/// </summary>

public class PostgreDbBatchSqlJournal : PostgreSqlJournal
{
public readonly PostgreSqlPersistence Extension = PostgreSqlPersistence.Get(Context.System);
public PostgreSqlJournalSettings JournalSettings { get; }
public PostgreDbBatchSqlJournal(Config journalConfig) : base(journalConfig)
{
var config = journalConfig.WithFallback(Extension.DefaultJournalConfig);
StoredAsType storedAs;
var storedAsString = config.GetString("stored-as");
if (!Enum.TryParse(storedAsString, true, out storedAs))
{
throw new ConfigurationException($"Value [{storedAsString}] of the 'stored-as' HOCON config key is not valid. Valid values: bytea, json, jsonb.");
}

QueryExecutor = new PostgreSqlDbBatchQueryExecutor(new PostgreSqlQueryConfiguration(
schemaName: config.GetString("schema-name"),
journalEventsTableName: config.GetString("table-name"),
metaTableName: config.GetString("metadata-table-name"),
persistenceIdColumnName: "persistence_id",
sequenceNrColumnName: "sequence_nr",
payloadColumnName: "payload",
manifestColumnName: "manifest",
timestampColumnName: "created_at",
isDeletedColumnName: "is_deleted",
tagsColumnName: "tags",
orderingColumn: "ordering",
serializerIdColumnName: "serializer_id",
timeout: config.GetTimeSpan("connection-timeout"),
storedAs: storedAs,
defaultSerializer: config.GetString("serializer"),
useSequentialAccess: config.GetBoolean("sequential-access"),
useBigIntPrimaryKey: config.GetBoolean("use-bigint-identity-for-ordering-column")),
Context.System.Serialization,
GetTimestampProvider(config.GetString("timestamp-provider")), Context.GetLogger());

JournalSettings = new PostgreSqlJournalSettings(config);
}

public override IJournalQueryExecutor QueryExecutor { get; }
protected override string JournalConfigPath => PostgreSqlJournalSettings.JournalConfigPath;
protected override DbConnection CreateDbConnection(string connectionString)
{
return new NpgsqlConnection(connectionString);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//-----------------------------------------------------------------------
// <copyright file="PostgreSqlQueryExecutor.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Actor;
using Akka.Persistence.Sql.Common.Journal;
using Akka.Serialization;
using Akka.Util;
using Newtonsoft.Json;
using Npgsql;
using NpgsqlTypes;
using System;
using System.Collections.Immutable;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Akka.Event;

namespace Akka.Persistence.PostgreSql.Journal
{
public class PostgreSqlDbBatchQueryExecutor : PostgreSqlQueryExecutor
{
protected readonly ILoggingAdapter Log;
public PostgreSqlDbBatchQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.Serialization.Serialization serialization, ITimestampProvider timestampProvider, ILoggingAdapter loggingAdapter)
: base(configuration, serialization, timestampProvider)
{
Log = loggingAdapter;
}

protected void AddParameter(NpgsqlBatchCommand command, string parameterName, DbType parameterType, object value)
{
var parameter = new NpgsqlParameter()
{
ParameterName = parameterName,
DbType = parameterType,
Value = value
};

command.Parameters.Add(parameter);
}
protected void WriteEvent(NpgsqlBatchCommand command, IPersistentRepresentation e, IImmutableSet<string> tags)
{
var serializationResult = _serialize(e);
var serializer = serializationResult.Serializer;
var hasSerializer = serializer != null;

string manifest = "";
if (hasSerializer && serializer is SerializerWithStringManifest)
manifest = ((SerializerWithStringManifest)serializer).Manifest(e.Payload);
else if (hasSerializer && serializer.IncludeManifest)
manifest = QualifiedName(e);
else
manifest = string.IsNullOrEmpty(e.Manifest) ? QualifiedName(e) : e.Manifest;

AddParameter(command, "@PersistenceId", DbType.String, e.PersistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, e.SequenceNr);
AddParameter(command, "@Timestamp", DbType.Int64, TimestampProvider.GenerateTimestamp(e));
AddParameter(command, "@IsDeleted", DbType.Boolean, false);
AddParameter(command, "@Manifest", DbType.String, manifest);

if (hasSerializer)
{
AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
}
else
{
AddParameter(command, "@SerializerId", DbType.Int32, DBNull.Value);
}

command.Parameters.Add(new NpgsqlParameter("@Payload", serializationResult.DbType) { Value = serializationResult.Payload });

if (tags.Count != 0)
{
var tagBuilder = new StringBuilder(";", tags.Sum(x => x.Length) + tags.Count + 1);
foreach (var tag in tags)
{
tagBuilder.Append(tag).Append(';');
}

AddParameter(command, "@Tag", DbType.String, tagBuilder.ToString());
}
else AddParameter(command, "@Tag", DbType.String, DBNull.Value);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static string QualifiedName(IPersistentRepresentation e)
=> e.Payload.GetType().TypeQualifiedName();


public override async Task InsertBatchAsync(DbConnection connection, CancellationToken cancellationToken, WriteJournalBatch write)
{
using (var tx = ((NpgsqlConnection)connection).BeginTransaction())
{
using (var batch = new NpgsqlBatch(((NpgsqlConnection)connection)))
{

batch.Transaction = tx;

foreach (var entry in write.EntryTags)
{
NpgsqlBatchCommand command = (NpgsqlBatchCommand)batch.CreateBatchCommand();
command.CommandText = this.InsertEventSql;

var evt = entry.Key;
var tags = entry.Value;

WriteEvent(command, evt, tags);
batch.BatchCommands.Add(command);
}
await batch.PrepareAsync(cancellationToken);
await batch.ExecuteNonQueryAsync(cancellationToken);
await tx.CommitAsync(cancellationToken);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ namespace Akka.Persistence.PostgreSql.Journal
{
public class PostgreSqlQueryExecutor : AbstractQueryExecutor
{
private readonly Func<IPersistentRepresentation, SerializationResult> _serialize;
private readonly Func<Type, object, string, int?, object> _deserialize;
internal readonly Func<IPersistentRepresentation, SerializationResult> _serialize;
internal readonly Func<Type, object, string, int?, object> _deserialize;

public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.Serialization.Serialization serialization, ITimestampProvider timestampProvider)
: base(configuration, serialization, timestampProvider)
{
var storedAs = configuration.StoredAs.ToString().ToUpperInvariant();
CreateEventsJournalSql = $@"

CreateEventsJournalSql = $@"
CREATE TABLE IF NOT EXISTS {Configuration.FullJournalTableName} (
{Configuration.OrderingColumnName} {(configuration.UseBigIntPrimaryKey ? "BIGINT GENERATED ALWAYS AS IDENTITY" : "BIGSERIAL")} NOT NULL PRIMARY KEY,
{Configuration.PersistenceIdColumnName} VARCHAR(255) NOT NULL,
Expand All @@ -46,8 +46,9 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.
{Configuration.TagsColumnName} VARCHAR(100) NULL,
{Configuration.SerializerIdColumnName} INTEGER NULL,
CONSTRAINT {Configuration.JournalEventsTableName}_uq UNIQUE ({Configuration.PersistenceIdColumnName}, {Configuration.SequenceNrColumnName})
);";

); CREATE INDEX IF NOT EXISTS IX_{Configuration.JournalEventsTableName}_{Configuration.SequenceNrColumnName} ON {Configuration.FullJournalTableName} USING btree (
{Configuration.SequenceNrColumnName} ASC NULLS LAST) INCLUDE({Configuration.PersistenceIdColumnName})
;";
CreateMetaTableSql = $@"
CREATE TABLE IF NOT EXISTS {Configuration.FullMetaTableName} (
{Configuration.PersistenceIdColumnName} VARCHAR(255) NOT NULL,
Expand Down
3 changes: 2 additions & 1 deletion src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<Copyright>Copyright © 2013-2017 Akka.NET Team</Copyright>
<Authors>Akka.NET Team</Authors>
<VersionPrefix>1.3.9</VersionPrefix>
<VersionPrefix>1.4.35</VersionPrefix>
<PackageIconUrl>http://getakka.net/images/akkalogo.png</PackageIconUrl>
<PackageProjectUrl>https://github.com/akkadotnet/Akka.Persistence.PostgreSql</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/akkadotnet/Akka.Persistence.PostgreSql/blob/dev/LICENSE.md</PackageLicenseUrl>
Expand All @@ -19,6 +19,7 @@
<PostgresHighVersion>6.0.7</PostgresHighVersion>
<PostgresVersion>[$(PostgresLowVersion), $(PostgresHighVersion)]</PostgresVersion>
<NetStandardLibVersion>netstandard2.0</NetStandardLibVersion>
<NetVersion>.net6.0</NetVersion>
<NetFrameworkVersion>net45</NetFrameworkVersion>
<TestSdkVersion>17.3.2</TestSdkVersion>
<NetCoreTestVersion>netcoreapp3.1</NetCoreTestVersion>
Expand Down

0 comments on commit e3abe09

Please sign in to comment.