Skip to content

Commit

Permalink
Added examples of choreography
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Feb 21, 2023
1 parent 16ddcd6 commit 9be3d3b
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 68 deletions.
61 changes: 61 additions & 0 deletions Core.Marten/Extensions/DocumentSessionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Core.ProcessManagers;
using Core.Structures;
using Marten;

Expand All @@ -16,6 +17,17 @@ CancellationToken ct
return documentSession.SaveChangesAsync(token: ct);
}

public static Task Add<T>(
this IDocumentSession documentSession,
string id,
object @event,
CancellationToken ct
) where T : class
{
documentSession.Events.StartStream<T>(id, @event);
return documentSession.SaveChangesAsync(token: ct);
}

public static Task GetAndUpdate<T>(
this IDocumentSession documentSession,
Guid id, int version,
Expand All @@ -25,6 +37,55 @@ CancellationToken ct
documentSession.Events.WriteToAggregate<T>(id, version, stream =>
stream.AppendOne(handle(stream.Aggregate)), ct);

public static Task GetAndUpdate<T>(
this IDocumentSession documentSession,
string id,
int version,
Func<T, IEnumerable<EventOrCommand>> handle,
CancellationToken ct
) where T : class =>
documentSession.Events.WriteToAggregate<T>(id, version, stream =>
{
var messages = handle(stream.Aggregate);

foreach (var message in messages)
{
message.Switch(
stream.AppendOne,
command => documentSession.Events.Append($"commands-{id}", command)
);
}
}, ct);


public static Task GetAndUpdate<T>(
this IDocumentSession documentSession,
string id,
Func<T, IEnumerable<EventOrCommand>> handle,
CancellationToken ct
) where T : class =>
documentSession.Events.WriteToAggregate<T>(id, stream =>
{
var messages = handle(stream.Aggregate);

foreach (var message in messages)
{
message.Switch(
stream.AppendOne,
command => documentSession.Events.Append($"commands-{id}", command)
);
}
}, ct);

public static Task GetAndUpdate<T>(
this IDocumentSession documentSession,
string id,
Func<T, IEnumerable<object>> handle,
CancellationToken ct
) where T : class =>
documentSession.Events.WriteToAggregate<T>(id,
stream => stream.AppendMany(handle(stream.Aggregate)), ct);

public static Task GetAndUpdate<T>(
this IDocumentSession documentSession,
Guid id,
Expand Down
14 changes: 0 additions & 14 deletions Core/ProcessManagers/IProcessManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,3 @@ public interface IProcessManager<out T>: IProjection

EventOrCommand[] DequeuePendingMessages();
}

public class EventOrCommand: Either<object, object>
{
public static EventOrCommand Event(object @event) =>
new(Maybe<object>.Of(@event), Maybe<object>.Empty);


public static EventOrCommand Command(object @event) =>
new(Maybe<object>.Empty, Maybe<object>.Of(@event));

private EventOrCommand(Maybe<object> left, Maybe<object> right): base(left, right)
{
}
}
2 changes: 2 additions & 0 deletions Core/ProcessManagers/ProcessManager.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Core.Structures;

namespace Core.ProcessManagers;

public abstract class ProcessManager: ProcessManager<Guid>, IProcessManager
Expand Down
20 changes: 20 additions & 0 deletions Core/Structures/EventOrCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Core.Structures;

public class EventOrCommand: Either<object, object>
{
public static EventOrCommand Event(object @event) =>
new(Maybe<object>.Of(@event), Maybe<object>.Empty);

public static IEnumerable<EventOrCommand> Events(params object[] events) =>
events.Select(Event);

public static IEnumerable<EventOrCommand> Events(IEnumerable<object> events) =>
events.Select(Event);

public static EventOrCommand Command(object @event) =>
new(Maybe<object>.Empty, Maybe<object>.Of(@event));

private EventOrCommand(Maybe<object> left, Maybe<object> right): base(left, right)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace HotelManagement.Choreography.GroupCheckouts;

public record GroupCheckoutInitiated(
Guid GroupCheckoutId,
Guid GroupCheckOutId,
Guid ClerkId,
Guid[] GuestStayIds,
DateTimeOffset InitiatedAt
Expand Down Expand Up @@ -55,48 +55,50 @@ DateTimeOffset initiatedAt
) =>
new GroupCheckoutInitiated(groupCheckoutId, clerkId, guestStayIds, initiatedAt);

public Maybe<GuestCheckoutsInitiated> RecordGuestCheckoutsInitiation(
public GuestCheckoutsInitiated? RecordGuestCheckoutsInitiation(
Guid[] initiatedGuestStayIds,
DateTimeOffset now
) =>
Maybe.If(
Status == CheckoutStatus.Initiated,
() => new GuestCheckoutsInitiated(Id, initiatedGuestStayIds, now)
);
)
{
if (Status == CheckoutStatus.Initiated)
return null;

public Maybe<object[]> RecordGuestCheckoutCompletion(
return new GuestCheckoutsInitiated(Id, initiatedGuestStayIds, now);
}

public object[] RecordGuestCheckoutCompletion(
Guid guestStayId,
DateTimeOffset now
) =>
Maybe.If(
Status == CheckoutStatus.Initiated && GuestStayCheckouts[guestStayId] != CheckoutStatus.Completed,
() =>
{
var guestCheckoutCompleted = new GuestCheckoutCompleted(Id, guestStayId, now);
)
{
if (Status == CheckoutStatus.Initiated && GuestStayCheckouts[guestStayId] != CheckoutStatus.Completed)
return Array.Empty<object>();

var guestCheckoutCompleted = new GuestCheckoutCompleted(Id, guestStayId, now);

var guestStayCheckouts = GuestStayCheckouts.With(guestStayId, CheckoutStatus.Completed);
var guestStayCheckouts = GuestStayCheckouts.With(guestStayId, CheckoutStatus.Completed);

return AreAnyOngoingCheckouts(guestStayCheckouts)
? new object[] { guestCheckoutCompleted }
: new[] { guestCheckoutCompleted, Finalize(guestStayCheckouts, now) };
});
return AreAnyOngoingCheckouts(guestStayCheckouts)
? new object[] { guestCheckoutCompleted }
: new[] { guestCheckoutCompleted, Finalize(guestStayCheckouts, now) };
}

public Maybe<object[]> RecordGuestCheckoutFailure(
public object[] RecordGuestCheckoutFailure(
Guid guestStayId,
DateTimeOffset now
) =>
Maybe.If(
Status == CheckoutStatus.Initiated && GuestStayCheckouts[guestStayId] != CheckoutStatus.Failed,
() =>
{
var guestCheckoutFailed = new GuestCheckoutFailed(Id, guestStayId, now);
)
{
if(Status == CheckoutStatus.Initiated && GuestStayCheckouts[guestStayId] != CheckoutStatus.Failed)
return Array.Empty<object>();

var guestCheckoutFailed = new GuestCheckoutFailed(Id, guestStayId, now);

var guestStayCheckouts = GuestStayCheckouts.With(guestStayId, CheckoutStatus.Failed);
var guestStayCheckouts = GuestStayCheckouts.With(guestStayId, CheckoutStatus.Failed);

return AreAnyOngoingCheckouts(guestStayCheckouts)
? new object[] { guestCheckoutFailed }
: new[] { guestCheckoutFailed, Finalize(guestStayCheckouts, now) };
});
return AreAnyOngoingCheckouts(guestStayCheckouts)
? new object[] { guestCheckoutFailed }
: new[] { guestCheckoutFailed, Finalize(guestStayCheckouts, now) };
}

private object Finalize(Dictionary<Guid, CheckoutStatus> guestStayCheckouts, DateTimeOffset now) =>
!AreAnyFailedCheckouts(guestStayCheckouts)
Expand Down Expand Up @@ -128,7 +130,7 @@ private static Guid[] CheckoutsWith(Dictionary<Guid, CheckoutStatus> guestStayCh

public static GroupCheckout Create(GroupCheckoutInitiated @event) =>
new GroupCheckout(
@event.GroupCheckoutId,
@event.GroupCheckOutId,
@event.GuestStayIds.ToDictionary(id => id, _ => CheckoutStatus.Pending)
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
using Core.Commands;
using Core.Events;
using Core.Marten.Extensions;
using Core.Structures;
using HotelManagement.Choreography.GuestStayAccounts;
using Marten;
using static Core.Structures.EventOrCommand;

namespace HotelManagement.Choreography.GroupCheckouts;

Expand Down Expand Up @@ -29,18 +33,22 @@ DateTimeOffset FailedAt

public class GuestStayDomainService:
ICommandHandler<InitiateGroupCheckout>,
ICommandHandler<RecordGuestCheckoutsInitiation>,
ICommandHandler<RecordGuestCheckoutCompletion>,
ICommandHandler<RecordGuestCheckoutFailure>
IEventHandler<GroupCheckoutInitiated>,
IEventHandler<GuestStayAccounts.GuestCheckedOut>,
IEventHandler<GuestStayAccounts.GuestCheckoutFailed>
{
private readonly IDocumentSession documentSession;
private readonly IAsyncCommandBus commandBus;

public GuestStayDomainService(IDocumentSession documentSession) =>
public GuestStayDomainService(IDocumentSession documentSession, IAsyncCommandBus commandBus)
{
this.documentSession = documentSession;
this.commandBus = commandBus;
}

public Task Handle(InitiateGroupCheckout command, CancellationToken ct) =>
documentSession.Add<GroupCheckout>(
command.GroupCheckoutId,
command.GroupCheckoutId.ToString(),
GroupCheckout.Initiate(
command.GroupCheckoutId,
command.ClerkId,
Expand All @@ -50,24 +58,46 @@ public Task Handle(InitiateGroupCheckout command, CancellationToken ct) =>
ct
);

public Task Handle(RecordGuestCheckoutsInitiation command, CancellationToken ct) =>
documentSession.GetAndUpdate(
command.GroupCheckoutId,
(GroupCheckout state) => state.RecordGuestCheckoutsInitiation(command.InitiatedGuestStayIds, DateTimeOffset.UtcNow),
ct
);
public Task Handle(GroupCheckoutInitiated @event, CancellationToken ct)
{
IEnumerable<EventOrCommand> OnInitiated(GroupCheckout groupCheckout)
{
var result = groupCheckout.RecordGuestCheckoutsInitiation(@event.GuestStayIds, @event.InitiatedAt);

if (result is not null)
{
foreach (var guestAccountId in @event.GuestStayIds)
{
yield return Command(new CheckOutGuest(guestAccountId, @event.GroupCheckOutId));
}

yield return Event(result);
}
}

public Task Handle(RecordGuestCheckoutCompletion command, CancellationToken ct) =>
documentSession.GetAndUpdate(
command.GuestStayId,
(GroupCheckout state) => state.RecordGuestCheckoutCompletion(command.GuestStayId, command.CompletedAt),
return documentSession.GetAndUpdate<GroupCheckout>(@event.GroupCheckOutId.ToString(), OnInitiated, ct);
}

public Task Handle(GuestStayAccounts.GuestCheckedOut @event, CancellationToken ct)
{
if (!@event.GroupCheckOutId.HasValue)
return Task.CompletedTask;

return documentSession.GetAndUpdate<GroupCheckout>(@event.GroupCheckOutId.Value.ToString(),
groupCheckout => groupCheckout.RecordGuestCheckoutCompletion(@event.GuestStayId, @event.CheckedOutAt),
ct
);
}

public Task Handle(Choreography.GuestStayAccounts.GuestCheckoutFailed @event, CancellationToken ct)
{
if (!@event.GroupCheckOutId.HasValue)
return Task.CompletedTask;

public Task Handle(RecordGuestCheckoutFailure command, CancellationToken ct) =>
documentSession.GetAndUpdate(
command.GuestStayId,
(GroupCheckout state) => state.RecordGuestCheckoutFailure(command.GuestStayId, command.FailedAt),
return documentSession.GetAndUpdate<GroupCheckout>(@event.GroupCheckOutId.Value.ToString(),
groupCheckout =>
groupCheckout.RecordGuestCheckoutFailure(@event.GuestStayId, @event.FailedAt),
ct
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ public async Task Handle(GroupCheckoutInitiated @event, CancellationToken ct)
foreach (var guestAccountId in @event.GuestStayIds)
{
await commandBus.Schedule(
new CheckOutGuest(guestAccountId, @event.GroupCheckoutId),
new CheckOutGuest(guestAccountId, @event.GroupCheckOutId),
ct
);
}

await commandBus.Schedule(
new RecordGuestCheckoutsInitiation(
@event.GroupCheckoutId,
@event.GroupCheckOutId,
@event.GuestStayIds
),
ct
Expand Down
2 changes: 1 addition & 1 deletion Sample/HotelManagement/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ It was modelled and explained in detail in the [Implementing Distributed Process
<a href="https://www.architecture-weekly.com/p/webinar-3-implementing-distributed" target="_blank"><img src="https://substackcdn.com/image/fetch/w_1920,h_1080,c_fill,f_auto,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-video.s3.amazonaws.com%2Fvideo_upload%2Fpost%2F69413446%2F526b9100-7271-4482-99e7-9559416e9848%2Ftranscoded-00624.png" alt="How to deal with privacy and GDPR in Event-Sourced systems" width="640" border="10" /></a>

It shows how to:
- orchestrate and coordinate a business workflow spanning across multiple aggregates using the [Saga pattern](https://event-driven.io/en/saga_process_manager_distributed_transactions/),
- orchestrate and coordinate a business workflow spanning across multiple aggregates using the [Saga pattern](./HotelManagement/Sagas), [Choreography](./HotelManagement/Choreography) and [Process Managers](./HotelManagement/ProcessManagers),
- handle distributed processing both for asynchronous command scheduling and event publishing,
- getting at-least-once delivery guarantee,
- implementing command store and outbox pattern on top of Marten and EventStoreDB,
Expand Down

0 comments on commit 9be3d3b

Please sign in to comment.