Skip to content

Commit

Permalink
Modernize (#8)
Browse files Browse the repository at this point in the history
* Use primary constructor where possible

* Use collection expressions where possible

* Make VersionedState readonly

* Use inferred membername

* Fix async warnings.

* Reformat queries

* Suppress naming style messages (member names should start with an Uppercase letter)

* Proper implementation of IDisposable, IAsyncDisposable

* Use ThrowIfNull where possible

* Compare with length/count == 0 where possible instead of .Any()

* Forgot a IDE1006 message

* Make methods static where possible.

* Remove unused method

* Return actual class instead of interface where possble (CA1859).

* Fix nullability warning

* Remove unused packages.

* Extend editorconfig.

* Run init.db on startup. This ain't pretty but it works.

* Fix concurrency bug

* Migrate sequence_id to BIGSERIAL

* Update process_managers.process to a bigserial

* Use schema instead of database

* Use primary constructor where possible

* Use collection expressions where possible

* Make VersionedState readonly

* Use inferred membername

* Fix async warnings.

* Reformat queries

* Suppress naming style messages (member names should start with an Uppercase letter)

* Proper implementation of IDisposable, IAsyncDisposable

* Use ThrowIfNull where possible

* Compare with length/count == 0 where possible instead of .Any()

* Forgot a IDE1006 message

* Make methods static where possible.

* Remove unused method

* Return actual class instead of interface where possble (CA1859).

* Fix nullability warning

* Remove unused packages.

* Extend editorconfig.

* Release 2.0.0
+semver:major

* Fix integration tests

* Wait for docker compose

* Fix: waiting for test container

* Use primary constructor where possible

* Run init.db on startup. This ain't pretty but it works.

* Fix merge issue

---------

Co-authored-by: Kent Chenery <[email protected]>
Co-authored-by: Alexey Raga <[email protected]>
  • Loading branch information
3 people authored Apr 26, 2024
1 parent ef70708 commit 560e24d
Show file tree
Hide file tree
Showing 24 changed files with 340 additions and 318 deletions.
124 changes: 119 additions & 5 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
root = true

# All files
[*]
trim_trailing_whitespace = true
insert_final_newline = true
Expand All @@ -8,19 +9,132 @@ indent_size = 4
indent_style = space
end_of_line = lf

# Xml files
[*.xml]
indent_size = 2

[*.cs]
#### Naming styles ####

# Naming rules

# Symbol specifications

dotnet_naming_symbols.private_or_internal_field.applicable_kinds = field
dotnet_naming_symbols.private_or_internal_field.applicable_accessibilities = internal, private, private_protected
dotnet_naming_symbols.private_or_internal_field.required_modifiers =

# Naming styles

dotnet_naming_style.lowercase__begins_with__.required_prefix = _
dotnet_naming_style.lowercase__begins_with__.required_suffix =
dotnet_naming_style.lowercase__begins_with__.word_separator =
dotnet_naming_style.lowercase__begins_with__.capitalization = all_lower
csharp_indent_labels = one_less_than_current
csharp_space_around_binary_operators = before_and_after
csharp_using_directive_placement = outside_namespace:silent
csharp_prefer_simple_using_statement = true:suggestion
csharp_prefer_braces = true:silent
csharp_style_namespace_declarations = file_scoped:silent
csharp_style_prefer_method_group_conversion = true:silent
csharp_style_prefer_top_level_statements = true:silent
csharp_style_prefer_primary_constructors = true:suggestion
csharp_style_expression_bodied_methods = true:silent
csharp_style_expression_bodied_constructors = true:silent
csharp_style_expression_bodied_operators = true:silent
csharp_style_expression_bodied_properties = true:silent
csharp_style_expression_bodied_indexers = true:silent
csharp_style_expression_bodied_accessors = true:silent
csharp_style_expression_bodied_lambdas = true:silent
csharp_style_expression_bodied_local_functions = true:silent
csharp_style_throw_expression = true:suggestion
csharp_style_prefer_null_check_over_type_check = true:suggestion
csharp_prefer_simple_default_expression = true:suggestion
csharp_style_prefer_local_over_anonymous_function = true:suggestion
csharp_style_prefer_index_operator = true:suggestion
csharp_style_prefer_range_operator = true:suggestion
csharp_style_implicit_object_creation_when_type_is_apparent = true:suggestion
csharp_style_prefer_tuple_swap = true:suggestion
csharp_style_prefer_utf8_string_literals = true:suggestion
csharp_style_inlined_variable_declaration = true:suggestion
csharp_style_deconstructed_variable_declaration = true:suggestion
csharp_style_unused_value_assignment_preference = discard_variable:suggestion
csharp_style_unused_value_expression_statement_preference = discard_variable:silent

resharper_csharp_place_expr_method_on_single_line = never

# Terraform files
[*.tf]
indent_size = 2
[*.{cs,vb}]
#### Naming styles ####

# Naming rules

dotnet_naming_rule.interface_should_be_begins_with_i.severity = suggestion
dotnet_naming_rule.interface_should_be_begins_with_i.symbols = interface
dotnet_naming_rule.interface_should_be_begins_with_i.style = begins_with_i

dotnet_naming_rule.types_should_be_pascal_case.severity = suggestion
dotnet_naming_rule.types_should_be_pascal_case.symbols = types
dotnet_naming_rule.types_should_be_pascal_case.style = pascal_case

dotnet_naming_rule.non_field_members_should_be_pascal_case.severity = suggestion
dotnet_naming_rule.non_field_members_should_be_pascal_case.symbols = non_field_members
dotnet_naming_rule.non_field_members_should_be_pascal_case.style = pascal_case

# Symbol specifications

dotnet_naming_symbols.interface.applicable_kinds = interface
dotnet_naming_symbols.interface.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.interface.required_modifiers =

dotnet_naming_symbols.types.applicable_kinds = class, struct, interface, enum
dotnet_naming_symbols.types.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.types.required_modifiers =

dotnet_naming_symbols.non_field_members.applicable_kinds = property, event, method
dotnet_naming_symbols.non_field_members.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.non_field_members.required_modifiers =

# Naming styles

dotnet_naming_style.begins_with_i.required_prefix = I
dotnet_naming_style.begins_with_i.required_suffix =
dotnet_naming_style.begins_with_i.word_separator =
dotnet_naming_style.begins_with_i.capitalization = pascal_case

dotnet_naming_style.pascal_case.required_prefix =
dotnet_naming_style.pascal_case.required_suffix =
dotnet_naming_style.pascal_case.word_separator =
dotnet_naming_style.pascal_case.capitalization = pascal_case

dotnet_naming_style.pascal_case.required_prefix =
dotnet_naming_style.pascal_case.required_suffix =
dotnet_naming_style.pascal_case.word_separator =
dotnet_naming_style.pascal_case.capitalization = pascal_case
dotnet_style_operator_placement_when_wrapping = beginning_of_line
tab_width = 4
indent_size = 4
dotnet_style_coalesce_expression = true:suggestion
dotnet_style_null_propagation = true:suggestion
dotnet_style_prefer_is_null_check_over_reference_equality_method = true:suggestion
dotnet_style_prefer_auto_properties = true:silent
dotnet_style_object_initializer = true:suggestion
dotnet_style_collection_initializer = true:suggestion
dotnet_style_prefer_simplified_boolean_expressions = true:suggestion
dotnet_style_prefer_conditional_expression_over_assignment = true:silent
dotnet_style_prefer_conditional_expression_over_return = true:silent
dotnet_style_explicit_tuple_names = true:suggestion
dotnet_style_prefer_inferred_tuple_names = true:suggestion
dotnet_style_prefer_inferred_anonymous_type_member_names = true:suggestion
dotnet_style_prefer_compound_assignment = true:suggestion
dotnet_style_prefer_simplified_interpolation = true:suggestion
dotnet_style_prefer_collection_expression = when_types_loosely_match:suggestion
dotnet_style_namespace_match_folder = true:suggestion

[*.json]
indent_size = 2

[*.{yaml,yml}]
indent_size = 2


[*.csproj]
indent_size = 2
indent_size = 2
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@

<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.DataAnnotations" Version="8.0.0" />
<PackageReference Include="Npgsql" Version="8.0.2" />
</ItemGroup>

Expand Down
59 changes: 28 additions & 31 deletions src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxBackend.cs
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
using System.Text.Json;
using Confluent.Kafka;
using Confluent.Kafka;
using Dapper;
using Microsoft.Extensions.Options;
using Npgsql;
using System.Text.Json;

namespace KafkaFlow.Outbox.Postgres;

public class PostgresOutboxBackend : IOutboxBackend
public class PostgresOutboxBackend(NpgsqlDataSource connectionPool) : IOutboxBackend
{
private readonly NpgsqlDataSource _connectionPool;

public PostgresOutboxBackend(NpgsqlDataSource connectionPool)
{
_connectionPool = connectionPool;
}
private readonly NpgsqlDataSource _connectionPool = connectionPool;

public async ValueTask Store(TopicPartition topicPartition, Message<byte[], byte[]> message, CancellationToken token = default)
{
var sql =
@"INSERT INTO outbox.outbox(topic_name, partition, message_key, message_headers, message_body)
VALUES (@topic_name, @partition, @message_key, @message_headers, @message_body)";
var sql = """
INSERT INTO outbox.outbox(topic_name, partition, message_key, message_headers, message_body)
VALUES (@topic_name, @partition, @message_key, @message_headers, @message_body)
""";

await using var conn = _connectionPool.CreateConnection();

Expand All @@ -41,27 +36,27 @@ public async ValueTask Store(TopicPartition topicPartition, Message<byte[], byte

public async ValueTask<OutboxRecord[]> Read(int batchSize, CancellationToken token = default)
{
var sql = @"
DELETE FROM outbox.outbox
WHERE
sequence_id = ANY(ARRAY(
SELECT sequence_id FROM outbox.outbox
ORDER BY sequence_id
LIMIT @batch_size
FOR UPDATE
))
RETURNING
sequence_id,
topic_name,
partition,
message_key,
message_headers,
message_body
";
var sql = """
DELETE FROM outbox.outbox
WHERE
sequence_id = ANY(ARRAY(
SELECT sequence_id FROM outbox.outbox
ORDER BY sequence_id
LIMIT @batch_size
FOR UPDATE
))
RETURNING
sequence_id,
topic_name,
partition,
message_key,
message_headers,
message_body
""";
await using var conn = _connectionPool.CreateConnection();
var result = await conn.QueryAsync<OutboxTableRow>(sql, new { batch_size = batchSize });

return result?.Select(ToOutboxRecord).ToArray() ?? Array.Empty<OutboxRecord>();
return result?.Select(ToOutboxRecord).ToArray() ?? [];
}

private static OutboxRecord ToOutboxRecord(OutboxTableRow row)
Expand Down Expand Up @@ -94,10 +89,12 @@ private static OutboxRecord ToOutboxRecord(OutboxTableRow row)

internal sealed class OutboxTableRow
{
#pragma warning disable IDE1006 // Naming Styles
public long sequence_id { get; set; }
public string topic_name { get; set; } = null!;
public int? partition { get; set; }
public byte[]? message_key { get; set; }
public string? message_headers { get; set; }
public byte[]? message_body { get; set; }
#pragma warning restore IDE1006 // Naming Styles
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.DataAnnotations" Version="8.0.0" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.6" />
</ItemGroup>

Expand Down
11 changes: 5 additions & 6 deletions src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@

namespace KafkaFlow.Outbox.SqlServer;

public class SqlServerOutboxBackend : IOutboxBackend
public class SqlServerOutboxBackend(IOptions<SqlServerBackendOptions> options) : IOutboxBackend
{
private readonly SqlServerBackendOptions _options;

public SqlServerOutboxBackend(IOptions<SqlServerBackendOptions> options)
=> _options = options?.Value ?? throw new ArgumentNullException(nameof(options));
private readonly SqlServerBackendOptions _options = options?.Value ?? throw new ArgumentNullException(nameof(options));

public async ValueTask Store(TopicPartition topicPartition, Message<byte[], byte[]> message, CancellationToken token = default)
{
Expand Down Expand Up @@ -53,7 +50,7 @@ ORDER BY [sequence_id]
using var conn = new SqlConnection(_options.ConnectionString);
var result = await conn.QueryAsync<OutboxTableRow>(sql, new { batch_size = batchSize });

return result?.Select(ToOutboxRecord).ToArray() ?? Array.Empty<OutboxRecord>();
return result?.Select(ToOutboxRecord).ToArray() ?? [];
}

private static OutboxRecord ToOutboxRecord(OutboxTableRow row)
Expand Down Expand Up @@ -86,10 +83,12 @@ private static OutboxRecord ToOutboxRecord(OutboxTableRow row)

internal sealed class OutboxTableRow
{
#pragma warning disable IDE1006 // Naming Styles
public long sequence_id { get; set; }
public string topic_name { get; set; } = null!;
public int? partition { get; set; }
public byte[]? message_key { get; set; }
public string? message_headers { get; set; }
public byte[]? message_body { get; set; }
#pragma warning restore IDE1006 // Naming Styles
}
27 changes: 10 additions & 17 deletions src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
using System.Transactions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Transactions;

namespace KafkaFlow.Outbox;

internal sealed class OutboxDispatcherService : BackgroundService
internal sealed class OutboxDispatcherService(
ILogger<OutboxDispatcherService> logger,
IMessageProducer<IOutboxDispatcher> producer,
IOutboxBackend outboxBackend) : BackgroundService
{
private readonly ILogger _logger;
private readonly IMessageProducer<IOutboxDispatcher> _producer;
private readonly IOutboxBackend _outboxBackend;

public OutboxDispatcherService(
ILogger<OutboxDispatcherService> logger,
IMessageProducer<IOutboxDispatcher> producer,
IOutboxBackend outboxBackend)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_producer = producer ?? throw new ArgumentNullException(nameof(producer));
_outboxBackend = outboxBackend ?? throw new ArgumentNullException(nameof(outboxBackend));
}
private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger));
private readonly IMessageProducer<IOutboxDispatcher> _producer = producer ?? throw new ArgumentNullException(nameof(producer));
private readonly IOutboxBackend _outboxBackend = outboxBackend ?? throw new ArgumentNullException(nameof(outboxBackend));

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Expand Down Expand Up @@ -48,13 +41,13 @@ private async Task<bool> DispatchNextBatchAsync(CancellationToken stoppingToken)
}

scope.Complete();
return batch.Any();
return batch.Length != 0;
}

private static TransactionScope BeginTransaction =>
new(
scopeOption: TransactionScopeOption.RequiresNew,
transactionOptions: new TransactionOptions
{ IsolationLevel = IsolationLevel.ReadCommitted, Timeout = TimeSpan.FromSeconds(30) },
{ IsolationLevel = IsolationLevel.ReadCommitted, Timeout = TimeSpan.FromSeconds(30) },
asyncFlowOption: TransactionScopeAsyncFlowOption.Enabled);
}
Loading

0 comments on commit 560e24d

Please sign in to comment.