diff --git a/Btms.Business.Tests/PreProcessing/MovementPreProcessingTests.cs b/Btms.Business.Tests/PreProcessing/MovementPreProcessingTests.cs new file mode 100644 index 00000000..0913e43f --- /dev/null +++ b/Btms.Business.Tests/PreProcessing/MovementPreProcessingTests.cs @@ -0,0 +1,39 @@ +using Btms.Backend.Data.InMemory; +using Btms.Business.Pipelines.PreProcessing; +using Btms.Types.Alvs; +using FluentAssertions; +using Microsoft.Extensions.Logging.Abstractions; +using TestDataGenerator; +using Xunit; + +namespace Btms.Business.Tests.PreProcessing; + +public class MovementPreProcessingTests +{ + [Fact] + public async Task WhenNotificationNotExists_ThenShouldBeCreated() + { + // ARRANGE + var clearanceRequest = CreateAlvsClearanceRequest(); + var dbContext = new MemoryMongoDbContext(); + var preProcessor = new MovementPreProcessor(dbContext, NullLogger.Instance); + + + // ACT + var preProcessingResult = await preProcessor.Process( + new PreProcessingContext(clearanceRequest, "TestMessageId")); + + // ASSERT + preProcessingResult.Outcome.Should().Be(PreProcessingOutcome.New); + var savedMovement = await dbContext.Movements.Find(clearanceRequest!.Header!.EntryReference!); + savedMovement.Should().NotBeNull(); + savedMovement.AuditEntries.Count.Should().Be(1); + savedMovement.AuditEntries[0].Status.Should().Be("Created"); + } + + private static AlvsClearanceRequest CreateAlvsClearanceRequest() + { + return ClearanceRequestBuilder.Default() + .WithValidDocumentReferenceNumbers().Build(); + } +} \ No newline at end of file diff --git a/Btms.Business.Tests/PreProcessing/NotificationsPreProcessingTests.cs b/Btms.Business.Tests/PreProcessing/NotificationsPreProcessingTests.cs new file mode 100644 index 00000000..f20f265d --- /dev/null +++ b/Btms.Business.Tests/PreProcessing/NotificationsPreProcessingTests.cs @@ -0,0 +1,71 @@ +using Btms.Backend.Data.InMemory; +using Btms.Business.Pipelines.PreProcessing; +using Btms.Types.Ipaffs; +using Btms.Types.Ipaffs.Mapping; +using FluentAssertions; +using Microsoft.Extensions.Logging.Abstractions; +using TestDataGenerator; +using Xunit; + +namespace Btms.Business.Tests.PreProcessing +{ + public class NotificationsPreProcessingTests + { + [Fact] + public async Task WhenNotificationNotExists_ThenShouldBeCreated() + { + // ARRANGE + var notification = CreateImportNotification(); + var dbContext = new MemoryMongoDbContext(); + var preProcessor = new ImportNotificationPreProcessor(dbContext, NullLogger.Instance); + + // ACT + var preProcessingResult = await preProcessor.Process( + new PreProcessingContext(notification, "TestMessageId")); + + // ASSERT + preProcessingResult.Outcome.Should().Be(PreProcessingOutcome.New); + var savedNotification = await dbContext.Notifications.Find(notification!.ReferenceNumber!); + savedNotification.Should().NotBeNull(); + savedNotification.AuditEntries.Count.Should().Be(1); + savedNotification.AuditEntries[0].Status.Should().Be("Created"); + } + + [Fact] + public async Task WhenNotificationExists_AndLastUpdatedIsNewer_ThenShouldBeUpdated() + { + // ARRANGE + var notification = CreateImportNotification(); + var dbContext = new MemoryMongoDbContext(); + await dbContext.Notifications.Insert(notification.MapWithTransform()); + notification.LastUpdated = notification.LastUpdated?.AddHours(1); + var preProcessor = new ImportNotificationPreProcessor(dbContext, NullLogger.Instance); + + + // ACT + var preProcessingResult = await preProcessor.Process( + new PreProcessingContext(notification, "TestMessageId")); + + // ASSERT + preProcessingResult.Outcome.Should().Be(PreProcessingOutcome.Changed); + var savedNotification = await dbContext.Notifications.Find(notification!.ReferenceNumber!); + savedNotification.Should().NotBeNull(); + savedNotification.AuditEntries.Count.Should().Be(1); + savedNotification.AuditEntries[0].Status.Should().Be("Updated"); + } + + private static ImportNotification CreateImportNotification() + { + return ImportNotificationBuilder.Default() + .WithReferenceNumber(ImportNotificationTypeEnum.Chedpp, 1, DateTime.UtcNow, 1) + .WithRandomCommodities(1, 2) + .Do(x => + { + foreach (var parameterSet in x.PartOne?.Commodities?.ComplementParameterSets!) + { + parameterSet.KeyDataPairs = null; + } + }).Build(); + } + } +} \ No newline at end of file diff --git a/Btms.Business.Tests/Services/LinkingServiceTests.cs b/Btms.Business.Tests/Services/LinkingServiceTests.cs index 02770156..ace93c73 100644 --- a/Btms.Business.Tests/Services/LinkingServiceTests.cs +++ b/Btms.Business.Tests/Services/LinkingServiceTests.cs @@ -2,14 +2,15 @@ using Btms.Backend.Data.InMemory; using Btms.Business.Services; using Btms.Metrics; -using Btms.Model; using Btms.Model.Alvs; +using Btms.Model.ChangeLog; using Btms.Model.Ipaffs; using FluentAssertions; using Microsoft.Extensions.Logging.Abstractions; using Xunit; using Document = Btms.Model.Alvs.Document; using Items = Btms.Model.Alvs.Items; +using Movement = Btms.Model.Movement; namespace Btms.Business.Tests.Services; @@ -49,7 +50,7 @@ public async Task LinkMovement_ExistingRequest_IncludesFieldsOfInterest_Matching // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.Linked); + linkResult.Outcome.Should().Be(LinkOutcome.Linked); linkResult.Notifications.Count.Should().Be(1); linkResult.Movements.Count.Should().Be(1); } @@ -68,7 +69,7 @@ public async Task LinkMovement_ExistingRequest_IncludesFieldsOfInterest_NoMatchi // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.NotLinked); + linkResult.Outcome.Should().Be(LinkOutcome.NotLinked); linkResult.Notifications.Count.Should().Be(0); linkResult.Movements.Count.Should().Be(1); } @@ -87,7 +88,7 @@ public async Task LinkMovement_ExistingRequest_NoFieldsOfInterest_NoMatchingTrig // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.NotLinked); + linkResult.Outcome.Should().Be(LinkOutcome.NotLinked); linkResult.Notifications.Count.Should().Be(0); linkResult.Movements.Count.Should().Be(0); } @@ -106,7 +107,7 @@ public async Task LinkMovement_NewRequest_MatchingCHED_AddsAllToLinkResult() // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.Linked); + linkResult.Outcome.Should().Be(LinkOutcome.Linked); linkResult.Notifications.Count.Should().Be(1); linkResult.Movements.Count.Should().Be(1); } @@ -125,7 +126,7 @@ public async Task LinkMovement_NewRequest_MultipleMatchingCHEDs_AddsAllToLinkRes // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.Linked); + linkResult.Outcome.Should().Be(LinkOutcome.Linked); linkResult.Notifications.Count.Should().Be(2); linkResult.Movements.Count.Should().Be(1); } @@ -142,7 +143,7 @@ public async Task LinkMovement_NewRequest_NoMatchingCHEDs_NoMatchingTriggered() // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.NotLinked); + linkResult.Outcome.Should().Be(LinkOutcome.NotLinked); linkResult.Notifications.Count.Should().Be(0); linkResult.Movements.Count.Should().Be(1); } @@ -161,7 +162,7 @@ public async Task LinkNotification_ExistingNotification_IncludesFieldsOfInterest // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.Linked); + linkResult.Outcome.Should().Be(LinkOutcome.Linked); linkResult.Notifications.Count.Should().Be(1); linkResult.Movements.Count.Should().Be(1); } @@ -180,7 +181,7 @@ public async Task LinkNotification_ExistingNotification_IncludesFieldsOfInterest // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.Linked); + linkResult.Outcome.Should().Be(LinkOutcome.Linked); linkResult.Notifications.Count.Should().Be(1); linkResult.Movements.Count.Should().Be(2); } @@ -199,7 +200,7 @@ public async Task LinkNotification_ExistingNotification_IncludesFieldsOfInterest // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.NotLinked); + linkResult.Outcome.Should().Be(LinkOutcome.NotLinked); linkResult.Notifications.Count.Should().Be(1); linkResult.Movements.Count.Should().Be(0); } @@ -218,7 +219,7 @@ public async Task LinkNotification_ExistingNotification_NoFieldsOfInterest_NoMat // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.NotLinked); + linkResult.Outcome.Should().Be(LinkOutcome.NotLinked); linkResult.Notifications.Count.Should().Be(0); linkResult.Movements.Count.Should().Be(0); } @@ -237,7 +238,7 @@ public async Task LinkNotification_NewNotification_MatchingMRN_AddsAllToLinkResu // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.Linked); + linkResult.Outcome.Should().Be(LinkOutcome.Linked); linkResult.Notifications.Count.Should().Be(1); linkResult.Movements.Count.Should().Be(4); } @@ -256,7 +257,7 @@ public async Task LinkNotification_NewNotification_MultipleMatchingMRNs_AddsAllT // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.Linked); + linkResult.Outcome.Should().Be(LinkOutcome.Linked); linkResult.Notifications.Count.Should().Be(1); linkResult.Movements.Count.Should().Be(1); } @@ -273,7 +274,7 @@ public async Task LinkNotification_NewNotification_NoMatchingMRNs_NoMatchingTrig // Assert linkResult.Should().NotBeNull(); - linkResult.State.Should().Be(LinkState.NotLinked); + linkResult.Outcome.Should().Be(LinkOutcome.NotLinked); linkResult.Notifications.Count.Should().Be(1); linkResult.Movements.Count.Should().Be(0); } @@ -309,8 +310,9 @@ private MovementLinkContext CreateMovementContext(Movement? movement, List(string path, string topic, IBlobItem item, var message = sensitiveDataSerializer.Deserialize(blobContent, _ => { })!; var headers = new Dictionary() { - { "messageId", item.Name }, { "jobId", job.JobId } + { "messageId", item.Name.TrimStart(path.ToCharArray()) }, { "jobId", job.JobId } }; if (BtmsDiagnostics.ActivitySource.HasListeners()) { diff --git a/Btms.Business/Extensions/ServiceCollectionExtensions.cs b/Btms.Business/Extensions/ServiceCollectionExtensions.cs index 38465b13..f68acb7a 100644 --- a/Btms.Business/Extensions/ServiceCollectionExtensions.cs +++ b/Btms.Business/Extensions/ServiceCollectionExtensions.cs @@ -9,9 +9,12 @@ using Btms.Common.Extensions; using Btms.Metrics.Extensions; using Btms.SensitiveData; +using Btms.Types.Ipaffs; using MediatR; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Btms.Business.Pipelines.PreProcessing; +using Btms.Types.Alvs; namespace Btms.Business.Extensions { @@ -59,6 +62,9 @@ public static IServiceCollection AddBusinessServices(this IServiceCollection ser services.AddScoped(); + services.AddScoped, ImportNotificationPreProcessor>(); + services.AddScoped, MovementPreProcessor>(); + return services; } } diff --git a/Btms.Business/Pipelines/PreProcessing/IPreProcessor.cs b/Btms.Business/Pipelines/PreProcessing/IPreProcessor.cs new file mode 100644 index 00000000..5621f56a --- /dev/null +++ b/Btms.Business/Pipelines/PreProcessing/IPreProcessor.cs @@ -0,0 +1,8 @@ +using Btms.Model.Data; + +namespace Btms.Business.Pipelines.PreProcessing; + +public interface IPreProcessor where TOutput : IAuditable +{ + Task> Process(PreProcessingContext preProcessingContext); +} \ No newline at end of file diff --git a/Btms.Business/Pipelines/PreProcessing/ImportNotificationPreProcessor.cs b/Btms.Business/Pipelines/PreProcessing/ImportNotificationPreProcessor.cs new file mode 100644 index 00000000..54b1c5ee --- /dev/null +++ b/Btms.Business/Pipelines/PreProcessing/ImportNotificationPreProcessor.cs @@ -0,0 +1,50 @@ +using Btms.Backend.Data; +using Btms.Common.Extensions; +using Btms.Model.ChangeLog; +using Btms.Types.Ipaffs; +using Btms.Types.Ipaffs.Mapping; +using Microsoft.Extensions.Logging; + +namespace Btms.Business.Pipelines.PreProcessing; + +public class ImportNotificationPreProcessor(IMongoDbContext dbContext, ILogger logger) : IPreProcessor +{ + public async Task> Process(PreProcessingContext preProcessingContext) + { + var internalNotification = preProcessingContext.Message.MapWithTransform(); + var existingNotification = + await dbContext.Notifications.Find(preProcessingContext.Message.ReferenceNumber!); + + if (existingNotification is null) + { + internalNotification.Create(preProcessingContext.MessageId); + await dbContext.Notifications.Insert(internalNotification); + return PreProcessResult.New(internalNotification); + } + + + if (internalNotification.UpdatedSource.TrimMicroseconds() > + existingNotification.UpdatedSource.TrimMicroseconds()) + { + internalNotification.AuditEntries = existingNotification.AuditEntries; + internalNotification.CreatedSource = existingNotification.CreatedSource; + + var changeSet = internalNotification.GenerateChangeSet(existingNotification); + + internalNotification.Update(preProcessingContext.MessageId, changeSet); + await dbContext.Notifications.Update(internalNotification, existingNotification._Etag); + + return PreProcessResult.Changed(internalNotification, changeSet); + } + + if (internalNotification.UpdatedSource.TrimMicroseconds() == + existingNotification.UpdatedSource.TrimMicroseconds()) + { + return PreProcessResult.AlreadyProcessed(existingNotification); + } + + logger.MessageSkipped(preProcessingContext.MessageId!, preProcessingContext.Message.ReferenceNumber!); + return PreProcessResult.Skipped(existingNotification); + + } +} \ No newline at end of file diff --git a/Btms.Business/Pipelines/PreProcessing/MovementPreProcessor.cs b/Btms.Business/Pipelines/PreProcessing/MovementPreProcessor.cs new file mode 100644 index 00000000..c1dfb02f --- /dev/null +++ b/Btms.Business/Pipelines/PreProcessing/MovementPreProcessor.cs @@ -0,0 +1,89 @@ +using Btms.Backend.Data; +using Btms.Model; +using Btms.Model.Auditing; +using Btms.Model.ChangeLog; +using Btms.Types.Alvs; +using Btms.Types.Alvs.Mapping; +using Microsoft.Extensions.Logging; + +namespace Btms.Business.Pipelines.PreProcessing; + +public class MovementPreProcessor(IMongoDbContext dbContext, ILogger logger) : IPreProcessor +{ + public async Task> Process(PreProcessingContext preProcessingContext) + { + + var internalClearanceRequest = AlvsClearanceRequestMapper.Map(preProcessingContext.Message); + var movement = BuildMovement(internalClearanceRequest); + var existingMovement = await dbContext.Movements.Find(movement.Id!); + + if (existingMovement is null) + { + var auditEntry = AuditEntry.CreateCreatedEntry( + movement.ClearanceRequests[0], + preProcessingContext.MessageId, + movement.ClearanceRequests[0].Header!.EntryVersionNumber.GetValueOrDefault(), + movement.UpdatedSource); + movement.Update(auditEntry); + await dbContext.Movements.Insert(movement); + return PreProcessResult.New(movement); + } + + if (movement.ClearanceRequests[^1].Header?.EntryVersionNumber > + existingMovement.ClearanceRequests[0].Header?.EntryVersionNumber) + { + var changeSet = movement.ClearanceRequests[^1].GenerateChangeSet(existingMovement.ClearanceRequests[0]); + + + var auditEntry = AuditEntry.CreateUpdated(changeSet, + preProcessingContext.MessageId, + movement.ClearanceRequests[0].Header!.EntryVersionNumber.GetValueOrDefault(), + movement.UpdatedSource); + movement.Update(auditEntry); + + existingMovement.ClearanceRequests.RemoveAll(x => + x.Header?.EntryReference == + movement.ClearanceRequests[0].Header?.EntryReference); + existingMovement.ClearanceRequests.AddRange(movement.ClearanceRequests); + if (existingMovement.Items == null) + { + existingMovement.Items = []; + } + + existingMovement.Items.AddRange(movement.Items); + await dbContext.Movements.Update(existingMovement, existingMovement._Etag); + return PreProcessResult.Changed(existingMovement, changeSet); + } + + if (movement.ClearanceRequests[^1].Header?.EntryVersionNumber == + existingMovement.ClearanceRequests[0].Header?.EntryVersionNumber) + { + return PreProcessResult.AlreadyProcessed(existingMovement); + } + + logger.MessageSkipped(preProcessingContext.MessageId!, preProcessingContext.Message.Header?.EntryReference!); + return PreProcessResult.Skipped(existingMovement); + + } + + public static Movement BuildMovement(Model.Alvs.AlvsClearanceRequest request) + { + return new Movement() + { + Id = request.Header!.EntryReference, + UpdatedSource = request.ServiceHeader?.ServiceCalled, + CreatedSource = request.ServiceHeader?.ServiceCalled, + ArrivesAt = request.Header.ArrivesAt, + EntryReference = request.Header.EntryReference!, + MasterUcr = request.Header.MasterUcr!, + DeclarationType = request.Header.DeclarationType!, + SubmitterTurn = request.Header.SubmitterTurn!, + DeclarantId = request.Header.DeclarantId!, + DeclarantName = request.Header.DeclarantName!, + DispatchCountryCode = request.Header.DispatchCountryCode!, + GoodsLocationCode = request.Header.GoodsLocationCode!, + ClearanceRequests = [request], + Items = request.Items?.Select(x => x).ToList()!, + }; + } +} \ No newline at end of file diff --git a/Btms.Business/Pipelines/PreProcessing/PreProcessResult.cs b/Btms.Business/Pipelines/PreProcessing/PreProcessResult.cs new file mode 100644 index 00000000..fd277e23 --- /dev/null +++ b/Btms.Business/Pipelines/PreProcessing/PreProcessResult.cs @@ -0,0 +1,27 @@ +using Btms.Model.ChangeLog; +using Btms.Model.Data; + +namespace Btms.Business.Pipelines.PreProcessing; + +public abstract record PreProcessResult +{ + public static PreProcessingResult AlreadyProcessed(T record) where T : IAuditable + { + return new PreProcessingResult(PreProcessingOutcome.AlreadyProcessed, record, null); + } + + public static PreProcessingResult Skipped(T record) where T : IAuditable + { + return new PreProcessingResult(PreProcessingOutcome.Skipped, record, null); + } + + public static PreProcessingResult Changed(T record, ChangeSet changeSet) where T : IAuditable + { + return new PreProcessingResult(PreProcessingOutcome.Changed, record, changeSet); + } + + public static PreProcessingResult New(T record) where T : IAuditable + { + return new PreProcessingResult(PreProcessingOutcome.New, record, null); + } +} \ No newline at end of file diff --git a/Btms.Business/Pipelines/PreProcessing/PreProcessingContext.cs b/Btms.Business/Pipelines/PreProcessing/PreProcessingContext.cs new file mode 100644 index 00000000..171ead67 --- /dev/null +++ b/Btms.Business/Pipelines/PreProcessing/PreProcessingContext.cs @@ -0,0 +1,3 @@ +namespace Btms.Business.Pipelines.PreProcessing; + +public record PreProcessingContext(T Message, string MessageId); \ No newline at end of file diff --git a/Btms.Business/Pipelines/PreProcessing/PreProcessingOutcome.cs b/Btms.Business/Pipelines/PreProcessing/PreProcessingOutcome.cs new file mode 100644 index 00000000..53aa91db --- /dev/null +++ b/Btms.Business/Pipelines/PreProcessing/PreProcessingOutcome.cs @@ -0,0 +1,9 @@ +namespace Btms.Business.Pipelines.PreProcessing; + +public enum PreProcessingOutcome +{ + New, + Changed, + Skipped, + AlreadyProcessed +} \ No newline at end of file diff --git a/Btms.Business/Pipelines/PreProcessing/PreProcessingResult.cs b/Btms.Business/Pipelines/PreProcessing/PreProcessingResult.cs new file mode 100644 index 00000000..8cd4eea5 --- /dev/null +++ b/Btms.Business/Pipelines/PreProcessing/PreProcessingResult.cs @@ -0,0 +1,19 @@ +using Btms.Model.ChangeLog; +using Btms.Model.Data; + +namespace Btms.Business.Pipelines.PreProcessing; + +public record PreProcessingResult( + PreProcessingOutcome Outcome, + T Record, + ChangeSet? ChangeSet) : PreProcessResult + where T : IAuditable +{ + + public bool IsCreatedOrChanged() + { + return Record.GetLatestAuditEntry().IsCreatedOrUpdated(); + } + + +} \ No newline at end of file diff --git a/Btms.Business/Pipelines/PreProcessing/PreProcessorLogging.cs b/Btms.Business/Pipelines/PreProcessing/PreProcessorLogging.cs new file mode 100644 index 00000000..76c351c3 --- /dev/null +++ b/Btms.Business/Pipelines/PreProcessing/PreProcessorLogging.cs @@ -0,0 +1,9 @@ +using Microsoft.Extensions.Logging; + +namespace Btms.Business.Pipelines.PreProcessing; + +internal static partial class PreProcessorLogging +{ + [LoggerMessage(Level = LogLevel.Information, Message = "Message skipped - {MessageId} - {Identifier}")] + internal static partial void MessageSkipped(this ILogger logger, string messageId, string identifier); +} \ No newline at end of file diff --git a/Btms.Business/Services/ImportNotificationLinkContext.cs b/Btms.Business/Services/ImportNotificationLinkContext.cs index 3a08414f..d8b38217 100644 --- a/Btms.Business/Services/ImportNotificationLinkContext.cs +++ b/Btms.Business/Services/ImportNotificationLinkContext.cs @@ -1,8 +1,9 @@ +using Btms.Model.ChangeLog; using Btms.Model.Ipaffs; namespace Btms.Business.Services; -public record ImportNotificationLinkContext(ImportNotification PersistedImportNotification, ImportNotification? ExistingImportNotification) : LinkContext +public record ImportNotificationLinkContext(ImportNotification PersistedImportNotification, ChangeSet? ChangeSet) : LinkContext { public override string GetIdentifiers() { diff --git a/Btms.Business/Services/LinkContext.cs b/Btms.Business/Services/LinkContext.cs index 0f06300b..93b5be8e 100644 --- a/Btms.Business/Services/LinkContext.cs +++ b/Btms.Business/Services/LinkContext.cs @@ -1,18 +1,19 @@ using Btms.Model; +using Btms.Model.ChangeLog; using Btms.Model.Ipaffs; namespace Btms.Business.Services; public abstract record LinkContext { - public static MovementLinkContext ForMovement(Movement receivedMovement, Movement? existingMovement = null) + public static MovementLinkContext ForMovement(Movement receivedMovement, ChangeSet? changeSet = null) { - return new MovementLinkContext(receivedMovement, existingMovement); + return new MovementLinkContext(receivedMovement, changeSet); } - public static ImportNotificationLinkContext ForImportNotification(ImportNotification receivedImportNotification, ImportNotification? existingImportNotification = null) + public static ImportNotificationLinkContext ForImportNotification(ImportNotification receivedImportNotification, ChangeSet? changeSet = null) { - return new ImportNotificationLinkContext(receivedImportNotification, existingImportNotification); + return new ImportNotificationLinkContext(receivedImportNotification, changeSet); } public abstract string GetIdentifiers(); diff --git a/Btms.Business/Services/LinkState.cs b/Btms.Business/Services/LinkOutcome.cs similarity index 72% rename from Btms.Business/Services/LinkState.cs rename to Btms.Business/Services/LinkOutcome.cs index 2bb841d9..7507e588 100644 --- a/Btms.Business/Services/LinkState.cs +++ b/Btms.Business/Services/LinkOutcome.cs @@ -1,6 +1,6 @@ namespace Btms.Business.Services; -public enum LinkState +public enum LinkOutcome { Linked, NotLinked diff --git a/Btms.Business/Services/LinkResult.cs b/Btms.Business/Services/LinkResult.cs index 8fc4b8ee..acd48fc3 100644 --- a/Btms.Business/Services/LinkResult.cs +++ b/Btms.Business/Services/LinkResult.cs @@ -3,9 +3,9 @@ namespace Btms.Business.Services; -public class LinkResult(LinkState state) +public class LinkResult(LinkOutcome state) { - public LinkState State { get; set; } = state; + public LinkOutcome Outcome { get; set; } = state; public List Notifications { get; set; } = new(); public List Movements { get; set; } = new(); } \ No newline at end of file diff --git a/Btms.Business/Services/LinkingService.cs b/Btms.Business/Services/LinkingService.cs index d1fc03c0..1c7e6714 100644 --- a/Btms.Business/Services/LinkingService.cs +++ b/Btms.Business/Services/LinkingService.cs @@ -1,9 +1,13 @@ +using System.Text.RegularExpressions; using Btms.Backend.Data; using Btms.Backend.Data.Extensions; +using Btms.Common.Extensions; using Btms.Metrics; using Btms.Model; +using Btms.Model.ChangeLog; using Btms.Model.Ipaffs; using Btms.Model.Relationships; +using Json.Patch; using Microsoft.Extensions.Logging; namespace Btms.Business.Services; @@ -47,19 +51,19 @@ public async Task Link(LinkContext linkContext, CancellationToken ca switch (linkContext) { case MovementLinkContext movementLinkContext: - if (!ShouldLink(movementLinkContext)) + if (!ShouldLinkMovement(movementLinkContext.ChangeSet)) { logger.LinkNotAttempted(linkContext.GetType().Name, linkContext.GetIdentifiers()); - return new LinkResult(LinkState.NotLinked); + return new LinkResult(LinkOutcome.NotLinked); } result = await FindMovementLinks(movementLinkContext.PersistedMovement, cancellationToken); break; case ImportNotificationLinkContext notificationLinkContext: - if (!ShouldLink(notificationLinkContext)) + if (!ShouldLink(notificationLinkContext.ChangeSet)) { logger.LinkNotAttempted(linkContext.GetType().Name, linkContext.GetIdentifiers()); - return new LinkResult(LinkState.NotLinked); + return new LinkResult(LinkOutcome.NotLinked); } result = await FindImportNotificationLinks(notificationLinkContext.PersistedImportNotification, @@ -69,7 +73,7 @@ public async Task Link(LinkContext linkContext, CancellationToken ca } - if (result.State == LinkState.NotLinked) + if (result.Outcome == LinkOutcome.NotLinked) { logger.LinkNotFound(linkContext.GetType().Name, linkContext.GetIdentifiers()); return result; @@ -132,65 +136,21 @@ await dbContext.Notifications.Update(notification, notification._Etag, transacti return result; } - private static bool ShouldLink(MovementLinkContext movContext) + private static bool ShouldLinkMovement(ChangeSet? changeSet) { - if (movContext.ExistingMovement is null) return true; - - var existingItems = movContext.ExistingMovement.Items is null ? [] : movContext.ExistingMovement.Items; - var receivedItems = movContext.PersistedMovement.Items is null ? [] : movContext.PersistedMovement.Items; - - // Diff movements for fields of interest - var existingDocs = existingItems - .SelectMany(x => x.Documents ?? []) - .Select(d => d.DocumentReference - ).ToList(); - - var receivedDocs = receivedItems - .SelectMany(x => x.Documents ?? []) - .Select(d => d.DocumentReference).ToList(); - - if (existingDocs.Count != receivedDocs.Count || - !existingDocs.TrueForAll(receivedDocs.Contains)) - { - // Delta in received Docs - return true; - } - - return false; + return changeSet is null || changeSet.HasDocumentsChanged(); } - private static bool ShouldLink(ImportNotificationLinkContext notifContext) + private static bool ShouldLink(ChangeSet? changeSet) { - if (notifContext.ExistingImportNotification is null) return true; - - var existingCommodities = notifContext.ExistingImportNotification.Commodities? - .Select(c => new - { - c.CommodityId, - c.CommodityDescription - }).ToList(); - var receivedCommodities = notifContext.PersistedImportNotification.Commodities? - .Select(c => new - { - c.CommodityId, - c.CommodityDescription - }).ToList(); - - if (existingCommodities?.Count != receivedCommodities?.Count || - existingCommodities?.TrueForAll(receivedCommodities!.Contains) != true) - { - // Delta in received Commodities - return true; - } - - return false; + return changeSet is null || changeSet.HasCommoditiesChanged(); } private async Task FindMovementLinks(Movement movement, CancellationToken cancellationToken) { var notifications = await dbContext.Notifications.Where(x => movement._MatchReferences.Contains(x._MatchReference)).ToListAsync(cancellationToken: cancellationToken); - return new LinkResult(notifications.Any() ? LinkState.Linked : LinkState.NotLinked) + return new LinkResult(notifications.Any() ? LinkOutcome.Linked : LinkOutcome.NotLinked) { Movements = [movement], Notifications = notifications @@ -201,10 +161,39 @@ private async Task FindImportNotificationLinks(ImportNotification im { var movements = await dbContext.Movements.Where(x => x._MatchReferences.Contains(importNotification._MatchReference)).ToListAsync(cancellationToken); - return new LinkResult(movements.Any() ? LinkState.Linked : LinkState.NotLinked) + return new LinkResult(movements.Any() ? LinkOutcome.Linked : LinkOutcome.NotLinked) { Movements = movements, Notifications = [importNotification] }; } +} + +public static partial class LinkingChangeSetExtensions +{ + [GeneratedRegex("Commodities\\/\\d\\/CommodityId", RegexOptions.IgnoreCase)] + private static partial Regex CommodityIdRegex(); + + [GeneratedRegex("Commodities\\/\\d", RegexOptions.IgnoreCase)] + private static partial Regex CommodityRegex(); + + [GeneratedRegex("Items\\/\\d", RegexOptions.IgnoreCase)] + private static partial Regex ItemsRegex(); + + [GeneratedRegex("Items\\/\\d\\/Documents\\/\\d", RegexOptions.IgnoreCase)] + private static partial Regex DocumentsRegex(); + + public static bool HasCommoditiesChanged(this ChangeSet changeSet) + { + return changeSet.JsonPatch.Operations + .Any(x => CommodityIdRegex().IsMatch(x.Path.ToString()) + || CommodityRegex().IsMatch(x.Path.ToString())); + } + + public static bool HasDocumentsChanged(this ChangeSet changeSet) + { + return changeSet.JsonPatch.Operations + .Any(x => ItemsRegex().IsMatch(x.Path.ToString()) + || DocumentsRegex().IsMatch(x.Path.ToString())); + } } \ No newline at end of file diff --git a/Btms.Business/Services/MovementLinkContext.cs b/Btms.Business/Services/MovementLinkContext.cs index 51d00808..e55fe312 100644 --- a/Btms.Business/Services/MovementLinkContext.cs +++ b/Btms.Business/Services/MovementLinkContext.cs @@ -1,8 +1,9 @@ using Btms.Model; +using Btms.Model.ChangeLog; namespace Btms.Business.Services; -public record MovementLinkContext(Movement PersistedMovement, Movement? ExistingMovement) : LinkContext +public record MovementLinkContext(Movement PersistedMovement, ChangeSet? ChangeSet) : LinkContext { public override string GetIdentifiers() { diff --git a/Btms.Consumers.Tests/ClearanceRequestConsumerTests.cs b/Btms.Consumers.Tests/ClearanceRequestConsumerTests.cs index fcca0322..6320736c 100644 --- a/Btms.Consumers.Tests/ClearanceRequestConsumerTests.cs +++ b/Btms.Consumers.Tests/ClearanceRequestConsumerTests.cs @@ -1,6 +1,10 @@ +using Btms.Business.Pipelines.PreProcessing; using Btms.Business.Services; -using Btms.Consumers; +using Btms.Consumers.Extensions; +using Btms.Model; +using Btms.Model.Auditing; using Btms.Types.Alvs; +using Btms.Types.Alvs.Mapping; using FluentAssertions; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; @@ -10,18 +14,68 @@ namespace Btms.Consumers.Tests { - public class ClearanceRequestConsumerTests : ConsumerTests + public class ClearanceRequestConsumerTests { + [Theory] + [InlineData(PreProcessingOutcome.New)] + [InlineData(PreProcessingOutcome.Skipped)] + [InlineData(PreProcessingOutcome.Changed)] + [InlineData(PreProcessingOutcome.AlreadyProcessed)] + public async Task WhenPreProcessingSucceeds_AndLastAuditEntryIsLinked_ThenLinkShouldNotBeRun(PreProcessingOutcome outcome) + { + // ARRANGE + var clearanceRequest = CreateAlvsClearanceRequest(); + var movement = + MovementPreProcessor.BuildMovement(AlvsClearanceRequestMapper.Map(clearanceRequest)); + + movement.Update(AuditEntry.CreateLinked("Test", 1, DateTime.Now)); + + var mockLinkingService = Substitute.For(); + var preProcessor = Substitute.For>(); + + preProcessor.Process(Arg.Any>()) + .Returns(Task.FromResult(new PreProcessingResult(outcome, movement, null))); + + var consumer = + new AlvsClearanceRequestConsumer(preProcessor, mockLinkingService, NullLogger.Instance); + consumer.Context = new ConsumerContext() + { + Headers = new Dictionary() + { + { "messageId", clearanceRequest!.Header!.EntryReference! } + } + }; + + // ACT + await consumer.OnHandle(clearanceRequest); + + // ASSERT + consumer.Context.IsLinked().Should().BeFalse(); + + await mockLinkingService.DidNotReceive().Link(Arg.Any(), Arg.Any()); + } + [Fact] - public async Task WhenNotificationNotExists_ThenShouldBeCreated() + public async Task WhenPreProcessingSucceeds_AndLastAuditEntryIsCreated_ThenLinkShouldBeRun() { // ARRANGE var clearanceRequest = CreateAlvsClearanceRequest(); - var dbContext = CreateDbContext(); + var movement = + MovementPreProcessor.BuildMovement(AlvsClearanceRequestMapper.Map(clearanceRequest)); + + movement.Update(AuditEntry.CreateCreatedEntry(movement,"Test", 1, DateTime.Now)); + var mockLinkingService = Substitute.For(); + var preProcessor = Substitute.For>(); + + mockLinkingService.Link(Arg.Any(), Arg.Any()) + .Returns(Task.FromResult(new LinkResult(LinkOutcome.Linked))); + + preProcessor.Process(Arg.Any>()) + .Returns(Task.FromResult(new PreProcessingResult(PreProcessingOutcome.New, movement, null))); var consumer = - new AlvsClearanceRequestConsumer(dbContext, mockLinkingService, NullLogger.Instance); + new AlvsClearanceRequestConsumer(preProcessor, mockLinkingService, NullLogger.Instance); consumer.Context = new ConsumerContext() { Headers = new Dictionary() @@ -34,10 +88,10 @@ public async Task WhenNotificationNotExists_ThenShouldBeCreated() await consumer.OnHandle(clearanceRequest); // ASSERT - var savedMovement = await dbContext.Movements.Find(clearanceRequest!.Header!.EntryReference!); - savedMovement.Should().NotBeNull(); - savedMovement.AuditEntries.Count.Should().Be(1); - savedMovement.AuditEntries[0].Status.Should().Be("Created"); + consumer.Context.IsPreProcessed().Should().BeTrue(); + consumer.Context.IsLinked().Should().BeTrue(); + + await mockLinkingService.Received().Link(Arg.Any(), Arg.Any()); } private static AlvsClearanceRequest CreateAlvsClearanceRequest() diff --git a/Btms.Consumers.Tests/NotificationsConsumerTests.cs b/Btms.Consumers.Tests/NotificationsConsumerTests.cs index f4fb1044..2dd44195 100644 --- a/Btms.Consumers.Tests/NotificationsConsumerTests.cs +++ b/Btms.Consumers.Tests/NotificationsConsumerTests.cs @@ -1,5 +1,8 @@ +using Btms.Business.Pipelines.PreProcessing; using Btms.Business.Services; using Btms.Consumers; +using Btms.Consumers.Extensions; +using Btms.Model.Auditing; using Btms.Types.Ipaffs; using Btms.Types.Ipaffs.Mapping; using FluentAssertions; @@ -13,15 +16,24 @@ namespace Btms.Consumers.Tests { public class NotificationsConsumerTests : ConsumerTests { - [Fact] - public async Task WhenNotificationNotExists_ThenShouldBeCreated() + [Theory] + [InlineData(PreProcessingOutcome.New)] + [InlineData(PreProcessingOutcome.Skipped)] + [InlineData(PreProcessingOutcome.Changed)] + [InlineData(PreProcessingOutcome.AlreadyProcessed)] + public async Task WhenPreProcessingSucceeds_AndLastAuditEntryIsLinked_ThenLinkShouldNotBeRun(PreProcessingOutcome outcome) { // ARRANGE var notification = CreateImportNotification(); - var dbContext = CreateDbContext(); + var modelNotification = notification.MapWithTransform(); + modelNotification.Changed(AuditEntry.CreateLinked("Test", 1, DateTime.Now)); var mockLinkingService = Substitute.For(); + var preProcessor = Substitute.For>(); + + preProcessor.Process(Arg.Any>()) + .Returns(Task.FromResult(new PreProcessingResult(outcome, modelNotification, null))); - var consumer = new NotificationConsumer(dbContext, mockLinkingService, NullLogger.Instance); + var consumer = new NotificationConsumer(preProcessor, mockLinkingService, NullLogger.Instance); consumer.Context = new ConsumerContext() { Headers = new Dictionary() { { "messageId", notification!.ReferenceNumber! } } @@ -31,24 +43,29 @@ public async Task WhenNotificationNotExists_ThenShouldBeCreated() await consumer.OnHandle(notification); // ASSERT - var savedNotification = await dbContext.Notifications.Find(notification!.ReferenceNumber!); - savedNotification.Should().NotBeNull(); - savedNotification.AuditEntries.Count.Should().Be(1); - savedNotification.AuditEntries[0].Status.Should().Be("Created"); + consumer.Context.IsLinked().Should().BeFalse(); + + await mockLinkingService.DidNotReceive().Link(Arg.Any(), Arg.Any()); } [Fact] - public async Task WhenNotificationExists_AndLastUpdatedIsNewer_ThenShouldBeUpdated() + public async Task WhenPreProcessingSucceeds_AndLastAuditEntryIsCreated_ThenLinkShouldBeRun() { // ARRANGE var notification = CreateImportNotification(); - var dbContext = CreateDbContext(); - await dbContext.Notifications.Insert(notification.MapWithTransform()); - notification.LastUpdated = notification.LastUpdated?.AddHours(1); + var modelNotification = notification.MapWithTransform(); + modelNotification.Changed(AuditEntry.CreateCreatedEntry(modelNotification, "Test", 1, DateTime.Now)); var mockLinkingService = Substitute.For(); + var preProcessor = Substitute.For>(); + mockLinkingService.Link(Arg.Any(), Arg.Any()) + .Returns(Task.FromResult(new LinkResult(LinkOutcome.Linked))); - var consumer = new NotificationConsumer(dbContext, mockLinkingService, NullLogger.Instance); + preProcessor.Process(Arg.Any>()) + .Returns(Task.FromResult(new PreProcessingResult(PreProcessingOutcome.New, modelNotification, null))); + + + var consumer = new NotificationConsumer(preProcessor, mockLinkingService, NullLogger.Instance); consumer.Context = new ConsumerContext() { Headers = new Dictionary() { { "messageId", notification!.ReferenceNumber! } } @@ -58,10 +75,10 @@ public async Task WhenNotificationExists_AndLastUpdatedIsNewer_ThenShouldBeUpdat await consumer.OnHandle(notification); // ASSERT - var savedNotification = await dbContext.Notifications.Find(notification!.ReferenceNumber!); - savedNotification.Should().NotBeNull(); - savedNotification.AuditEntries.Count.Should().Be(1); - savedNotification.AuditEntries[0].Status.Should().Be("Updated"); + consumer.Context.IsPreProcessed().Should().BeTrue(); + consumer.Context.IsLinked().Should().BeTrue(); + + await mockLinkingService.Received().Link(Arg.Any(), Arg.Any()); } private static ImportNotification CreateImportNotification() diff --git a/Btms.Consumers/AlvsClearanceRequestConsumer.cs b/Btms.Consumers/AlvsClearanceRequestConsumer.cs index 862e4076..8a7550e6 100644 --- a/Btms.Consumers/AlvsClearanceRequestConsumer.cs +++ b/Btms.Consumers/AlvsClearanceRequestConsumer.cs @@ -1,119 +1,45 @@ -using Btms.Backend.Data; using Btms.Business.Services; -using Btms.Model; -using Btms.Model.Auditing; using Btms.Types.Alvs; -using Btms.Types.Alvs.Mapping; using Microsoft.Extensions.Logging; using SlimMessageBus; -using System.Diagnostics.CodeAnalysis; using Btms.Consumers.Extensions; -using Force.DeepCloner; -using Items = Btms.Model.Alvs.Items; +using Btms.Business.Pipelines.PreProcessing; namespace Btms.Consumers { - internal class AlvsClearanceRequestConsumer(IMongoDbContext dbContext, ILinkingService linkingService, ILogger logger) + internal class AlvsClearanceRequestConsumer(IPreProcessor preProcessor, ILinkingService linkingService, ILogger logger) : IConsumer, IConsumerWithContext { - private ILinkingService linkingService { get; } = linkingService; - - [SuppressMessage("SonarLint", "S1481", - Justification = - "LinkResult variable is unused until matching and decisions are implemented")] public async Task OnHandle(AlvsClearanceRequest message) { - var auditId = Context.Headers["messageId"].ToString(); - logger.ConsumerStarted(Context.GetJobId()!, auditId!, GetType().Name, message.Header?.EntryReference!); - using (logger.BeginScope(new List> - { - new("JobId", Context.GetJobId()!), - new("MessageId", auditId!), - new("Consumer", GetType().Name), - new("Identifier", message.Header?.EntryReference!), - })) + var messageId = Context.GetMessageId(); + using (logger.BeginScope(Context.GetJobId()!, messageId, GetType().Name, message.Header?.EntryReference!)) { - var internalClearanceRequest = AlvsClearanceRequestMapper.Map(message); - var movement = BuildMovement(internalClearanceRequest); - var existingMovement = await dbContext.Movements.Find(movement.Id!); - Movement persistedMovement = null!; + var preProcessingResult = await preProcessor.Process(new PreProcessingContext(message, messageId)); - if (existingMovement is not null) + if (preProcessingResult.Outcome == PreProcessingOutcome.Skipped) { - if (movement.ClearanceRequests[0].Header?.EntryVersionNumber > - existingMovement.ClearanceRequests[0].Header?.EntryVersionNumber) - { - persistedMovement = existingMovement.DeepClone(); - var auditEntry = AuditEntry.CreateUpdated(existingMovement.ClearanceRequests[0], - movement.ClearanceRequests[0], - BuildNormalizedAlvsPath(auditId!), - movement.ClearanceRequests[0].Header!.EntryVersionNumber.GetValueOrDefault(), - movement.UpdatedSource); - movement.Update(auditEntry); - - existingMovement.ClearanceRequests.RemoveAll(x => - x.Header?.EntryReference == - movement.ClearanceRequests[0].Header?.EntryReference); - existingMovement.ClearanceRequests.AddRange(movement.ClearanceRequests); - if (existingMovement.Items == null) - { - existingMovement.Items = new List(); - } - - existingMovement.Items.AddRange(movement.Items); - await dbContext.Movements.Update(existingMovement, existingMovement._Etag); - } - else - { - logger.MessageSkipped(Context.GetJobId()!, auditId!, GetType().Name, message.Header?.EntryReference!); - Context.Skipped(); - return; - } + Context.Skipped(); } else { - persistedMovement = movement!; - var auditEntry = AuditEntry.CreateCreatedEntry( - movement.ClearanceRequests[0], - BuildNormalizedAlvsPath(auditId!), - movement.ClearanceRequests[0].Header!.EntryVersionNumber.GetValueOrDefault(), - movement.UpdatedSource); - movement.Update(auditEntry); - await dbContext.Movements.Insert(movement); + Context.PreProcessed(); } - //This should be existing, pre update (may need to clone) - var linkContext = new MovementLinkContext(persistedMovement, existingMovement); - var linkResult = await linkingService.Link(linkContext, Context.CancellationToken); + if (preProcessingResult.IsCreatedOrChanged()) + { + var linkContext = new MovementLinkContext(preProcessingResult.Record!, + preProcessingResult.ChangeSet); + var linkResult = await linkingService.Link(linkContext, Context.CancellationToken); + + if (linkResult.Outcome == LinkOutcome.Linked) + { + Context.Linked(); + } + } } } public IConsumerContext Context { get; set; } = null!; - - public static Movement BuildMovement(Model.Alvs.AlvsClearanceRequest request) - { - return new Movement() - { - Id = request.Header!.EntryReference, - UpdatedSource = request.ServiceHeader?.ServiceCalled, - CreatedSource = request.ServiceHeader?.ServiceCalled, - ArrivesAt = request.Header.ArrivesAt, - EntryReference = request.Header.EntryReference!, - MasterUcr = request.Header.MasterUcr!, - DeclarationType = request.Header.DeclarationType!, - SubmitterTurn = request.Header.SubmitterTurn!, - DeclarantId = request.Header.DeclarantId!, - DeclarantName = request.Header.DeclarantName!, - DispatchCountryCode = request.Header.DispatchCountryCode!, - GoodsLocationCode = request.Header.GoodsLocationCode!, - ClearanceRequests = [request], - Items = request.Items?.Select(x => x).ToList()!, - }; - } - - private static string BuildNormalizedAlvsPath(string fullPath) - { - return fullPath.Replace("RAW/ALVS/", ""); - } } } \ No newline at end of file diff --git a/Btms.Consumers/ConsumerLogging.cs b/Btms.Consumers/ConsumerLogging.cs index 8d470f04..242d4e16 100644 --- a/Btms.Consumers/ConsumerLogging.cs +++ b/Btms.Consumers/ConsumerLogging.cs @@ -1,4 +1,5 @@ using Microsoft.Extensions.Logging; +using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities; namespace Btms.Consumers; @@ -9,4 +10,14 @@ internal static partial class ConsumerLogging [LoggerMessage(Level = LogLevel.Information, Message = "Message skipped - {JobId} - {MessageId} - {Consumer} - {Identifier}")] internal static partial void MessageSkipped(this ILogger logger, string jobId, string messageId, string consumer, string identifier); + + + internal static IDisposable BeginScope(this ILogger logger, string jobId, string messageId, string consumer, string identifier) + { + logger.ConsumerStarted(jobId, messageId, consumer, identifier); + return logger.BeginScope(new List> + { + new("JobId", jobId), new("MessageId", messageId), new("Consumer", consumer), new("Identifier", identifier), + })!; + } } \ No newline at end of file diff --git a/Btms.Consumers/Extensions/ConsumerContextExtensions.cs b/Btms.Consumers/Extensions/ConsumerContextExtensions.cs index 550c43b5..2768e6a0 100644 --- a/Btms.Consumers/Extensions/ConsumerContextExtensions.cs +++ b/Btms.Consumers/Extensions/ConsumerContextExtensions.cs @@ -26,6 +26,16 @@ public static int GetRetryAttempt(this IConsumerContext consumerContext) return null; } + public static string GetMessageId(this IConsumerContext consumerContext) + { + if (consumerContext.Headers.TryGetValue(MessageBusHeaders.MessageId, out var value)) + { + return value.ToString()!; + } + + return string.Empty; + } + public static ActivityContext GetActivityContext(this IConsumerContext consumerContext) { if (consumerContext.Properties.TryGetValue(MessageBusHeaders.TraceParent, out var value)) @@ -41,6 +51,36 @@ public static void Skipped(this IConsumerContext consumerContext) consumerContext.Properties.TryAdd(MessageBusHeaders.Skipped, true); } + public static void PreProcessed(this IConsumerContext consumerContext) + { + consumerContext.Properties.TryAdd(MessageBusHeaders.PreProcessed, true); + } + + public static bool IsPreProcessed(this IConsumerContext consumerContext) + { + if (consumerContext.Properties.TryGetValue(MessageBusHeaders.PreProcessed, out var value)) + { + return (bool)value; + } + + return false; + } + + public static void Linked(this IConsumerContext consumerContext) + { + consumerContext.Properties.TryAdd(MessageBusHeaders.Linked, true); + } + + public static bool IsLinked(this IConsumerContext consumerContext) + { + if (consumerContext.Properties.TryGetValue(MessageBusHeaders.Linked, out var value)) + { + return (bool)value; + } + + return false; + } + public static bool WasSkipped(this IConsumerContext consumerContext) { if (consumerContext.Properties.TryGetValue(MessageBusHeaders.Skipped, out _)) diff --git a/Btms.Consumers/MessageBusHeaders.cs b/Btms.Consumers/MessageBusHeaders.cs index ff913db7..503d8be9 100644 --- a/Btms.Consumers/MessageBusHeaders.cs +++ b/Btms.Consumers/MessageBusHeaders.cs @@ -5,5 +5,8 @@ public static class MessageBusHeaders public const string RetryCount = "btms.retry.count"; public const string TraceParent = "traceparent"; public const string JobId = "jobId"; + public const string MessageId = "messageId"; public const string Skipped = "skipped"; + public const string PreProcessed = "pre-processed"; + public const string Linked = "linked"; } \ No newline at end of file diff --git a/Btms.Consumers/NotificationConsumer.cs b/Btms.Consumers/NotificationConsumer.cs index 677e09d1..66c08c2d 100644 --- a/Btms.Consumers/NotificationConsumer.cs +++ b/Btms.Consumers/NotificationConsumer.cs @@ -1,76 +1,46 @@ -using Btms.Backend.Data; using Btms.Business.Services; -using Btms.Common.Extensions; using Btms.Types.Ipaffs; -using Btms.Types.Ipaffs.Mapping; using SlimMessageBus; -using System.Diagnostics.CodeAnalysis; using Btms.Consumers.Extensions; -using Force.DeepCloner; using Microsoft.Extensions.Logging; +using Btms.Business.Pipelines.PreProcessing; namespace Btms.Consumers { - internal class NotificationConsumer(IMongoDbContext dbContext, ILinkingService linkingService, ILogger logger) + internal class NotificationConsumer(IPreProcessor preProcessor, ILinkingService linkingService, ILogger logger) : IConsumer, IConsumerWithContext { - private ILinkingService linkingService { get; } = linkingService; - - [SuppressMessage("SonarLint", "S1481", - Justification = - "LinkResult variable is unused until matching and decisions are implemented")] public async Task OnHandle(ImportNotification message) { - var auditId = Context.Headers["messageId"].ToString(); - logger.ConsumerStarted(Context.GetJobId()!, auditId!, GetType().Name, message.ReferenceNumber!); - using (logger.BeginScope(new List> - { - new("JobId", Context.GetJobId()!), - new("MessageId", auditId!), - new("Consumer", GetType().Name), - new("Identifier", message.ReferenceNumber!), - })) + var messageId = Context.GetMessageId(); + using (logger.BeginScope(Context.GetJobId()!, messageId, GetType().Name, message.ReferenceNumber!)) { - var internalNotification = message.MapWithTransform(); - + var preProcessingResult = await preProcessor.Process(new PreProcessingContext(message, messageId)); - var existingNotification = await dbContext.Notifications.Find(message.ReferenceNumber!); - Model.Ipaffs.ImportNotification persistedNotification = null!; - if (existingNotification is not null) + if (preProcessingResult.Outcome == PreProcessingOutcome.Skipped) { - if (internalNotification.UpdatedSource.TrimMicroseconds() > - existingNotification.UpdatedSource.TrimMicroseconds()) - { - persistedNotification = existingNotification.DeepClone(); - internalNotification.AuditEntries = existingNotification.AuditEntries; - internalNotification.CreatedSource = existingNotification.CreatedSource; - internalNotification.Update(BuildNormalizedIpaffsPath(auditId!), existingNotification); - await dbContext.Notifications.Update(internalNotification, existingNotification._Etag); - } - else - { - logger.MessageSkipped(Context.GetJobId()!, auditId!, GetType().Name, message.ReferenceNumber!); - Context.Skipped(); - return; - } + Context.Skipped(); } else { - internalNotification.Create(BuildNormalizedIpaffsPath(auditId!)); - await dbContext.Notifications.Insert(internalNotification); - persistedNotification = internalNotification!; + Context.PreProcessed(); + } + + if (preProcessingResult.IsCreatedOrChanged()) + { + var linkContext = new ImportNotificationLinkContext(preProcessingResult.Record!, + preProcessingResult.ChangeSet); + var linkResult = await linkingService.Link(linkContext, Context.CancellationToken); + + if (linkResult.Outcome == LinkOutcome.Linked) + { + Context.Linked(); + } } - var linkContext = new ImportNotificationLinkContext(persistedNotification, existingNotification); - var linkResult = await linkingService.Link(linkContext, Context.CancellationToken); } } public IConsumerContext Context { get; set; } = null!; - - private static string BuildNormalizedIpaffsPath(string fullPath) - { - return fullPath.Replace("RAW/IPAFFS/", ""); - } } } \ No newline at end of file diff --git a/Btms.Model/Auditing/AuditEntry.cs b/Btms.Model/Auditing/AuditEntry.cs index cf9980b9..fe729bc1 100644 --- a/Btms.Model/Auditing/AuditEntry.cs +++ b/Btms.Model/Auditing/AuditEntry.cs @@ -1,5 +1,6 @@ using System.Text.Json; using System.Text.Json.Nodes; +using Btms.Model.ChangeLog; using Btms.Model.Extensions; using Json.Patch; @@ -21,6 +22,21 @@ public class AuditEntry public List Diff { get; set; } = new(); + public bool IsCreatedOrUpdated() + { + return IsCreated() || IsUpdated(); + } + + public bool IsCreated() + { + return this.Status == "Created"; + } + + public bool IsUpdated() + { + return this.Status == "Created"; + } + public static AuditEntry Create(T previous, T current, string id, int version, DateTime? lastUpdated, string lastUpdatedBy, string status) @@ -36,6 +52,26 @@ public static AuditEntry CreateUpdated(T previous, T current, string id, int return Create(previous, current, id, version, lastUpdated, CreatedBySystem, "Updated"); } + public static AuditEntry CreateUpdated(ChangeSet changeSet, string id, int version, DateTime? lastUpdated) + { + var auditEntry = new AuditEntry() + { + Id = id, + Version = version, + CreatedSource = lastUpdated, + CreatedBy = CreatedBySystem, + CreatedLocal = DateTime.UtcNow, + Status = "Updated" + }; + + foreach (var operation in changeSet.JsonPatch.Operations) + { + auditEntry.Diff.Add(AuditDiffEntry.Internal(operation)); + } + + return auditEntry; + } + public static AuditEntry CreateCreatedEntry(T current, string id, int version, DateTime? lastUpdated) { return new AuditEntry() diff --git a/Btms.Model/ChangeLog/ChangeSet.cs b/Btms.Model/ChangeLog/ChangeSet.cs new file mode 100644 index 00000000..8fb0ffb4 --- /dev/null +++ b/Btms.Model/ChangeLog/ChangeSet.cs @@ -0,0 +1,31 @@ +using System.Text.Json; +using System.Text.Json.Nodes; +using System.Text.Json.Serialization; +using Btms.Model.Extensions; +using Json.Patch; + +namespace Btms.Model.ChangeLog; + +public class ChangeSet(JsonPatch jsonPatch) +{ + private static JsonSerializerOptions jsonOptions = new JsonSerializerOptions() + { + TypeInfoResolver = new ChangeSetTypeInfoResolver(), + PropertyNameCaseInsensitive = true, + NumberHandling = JsonNumberHandling.AllowReadingFromString + }; + + public JsonPatch JsonPatch { get; } = jsonPatch; + + public static ChangeSet CreateChangeSet(T current, T previous) + { + var previousNode = JsonNode.Parse(previous.ToJsonString(jsonOptions)); + var currentNode = JsonNode.Parse(current.ToJsonString(jsonOptions)); + var diff = previousNode.CreatePatch(currentNode); + + //exclude fields from patch, like _ts, audit entries etc + var operations = diff.Operations.Where(x => !x.Path.ToString().Contains("_ts")); + + return new ChangeSet(new JsonPatch(operations)); + } +} \ No newline at end of file diff --git a/Btms.Model/ChangeLog/ChangeSetExtensions.cs b/Btms.Model/ChangeLog/ChangeSetExtensions.cs new file mode 100644 index 00000000..7d7cd1f1 --- /dev/null +++ b/Btms.Model/ChangeLog/ChangeSetExtensions.cs @@ -0,0 +1,9 @@ +namespace Btms.Model.ChangeLog; + +public static class ChangeSetExtensions +{ + public static ChangeSet GenerateChangeSet(this T current, T previous) + { + return ChangeSet.CreateChangeSet(current, previous); + } +} \ No newline at end of file diff --git a/Btms.Model/ChangeLog/ChangeSetIgnoreAttribute.cs b/Btms.Model/ChangeLog/ChangeSetIgnoreAttribute.cs new file mode 100644 index 00000000..71dc04ce --- /dev/null +++ b/Btms.Model/ChangeLog/ChangeSetIgnoreAttribute.cs @@ -0,0 +1,7 @@ +namespace Btms.Model.ChangeLog; + +[AttributeUsage(AttributeTargets.Property)] +public class ChangeSetIgnoreAttribute : Attribute +{ + +} \ No newline at end of file diff --git a/Btms.Model/ChangeLog/ChangeSetTypeInfoResolver.cs b/Btms.Model/ChangeLog/ChangeSetTypeInfoResolver.cs new file mode 100644 index 00000000..53d5b45a --- /dev/null +++ b/Btms.Model/ChangeLog/ChangeSetTypeInfoResolver.cs @@ -0,0 +1,25 @@ +using System.Text.Json; +using System.Text.Json.Serialization.Metadata; + +namespace Btms.Model.ChangeLog; + +public class ChangeSetTypeInfoResolver : DefaultJsonTypeInfoResolver +{ + public override JsonTypeInfo GetTypeInfo(Type type, JsonSerializerOptions options) + { + JsonTypeInfo typeInfo = base.GetTypeInfo(type, options); + + if (typeInfo.Kind == JsonTypeInfoKind.Object) + { + foreach (var property in typeInfo.Properties) + { + if (property.AttributeProvider!.GetCustomAttributes(typeof(ChangeSetIgnoreAttribute), false).Any()) + { + property.ShouldSerialize = (o, o1) => false; + } + } + } + + return typeInfo; + } +} \ No newline at end of file diff --git a/Btms.Model/Data/IAuditable.cs b/Btms.Model/Data/IAuditable.cs new file mode 100644 index 00000000..2d97724a --- /dev/null +++ b/Btms.Model/Data/IAuditable.cs @@ -0,0 +1,8 @@ +using Btms.Model.Auditing; + +namespace Btms.Model.Data; + +public interface IAuditable +{ + AuditEntry GetLatestAuditEntry(); +} \ No newline at end of file diff --git a/Btms.Model/Ipaffs/ImportNotification.cs b/Btms.Model/Ipaffs/ImportNotification.cs index 3a44fc09..d4569ef9 100644 --- a/Btms.Model/Ipaffs/ImportNotification.cs +++ b/Btms.Model/Ipaffs/ImportNotification.cs @@ -6,16 +6,19 @@ using Btms.Model.Auditing; using Btms.Model.Data; using Btms.Model.Relationships; +using Btms.Model.ChangeLog; namespace Btms.Model.Ipaffs; [Resource(PublicName = "import-notifications")] -public partial class ImportNotification : IMongoIdentifiable, IDataEntity +public partial class ImportNotification : IMongoIdentifiable, IDataEntity, IAuditable { private string? matchReference; //// This field is used by the jsonapi-consumer to control the correct casing in the type field - [JsonIgnore] public string Type { get; set; } = "import-notification"; + [JsonIgnore] + [ChangeSetIgnore] + public string Type { get; set; } = "import-notification"; //[BsonId(IdGenerator = typeof(StringObjectIdGenerator))] [JsonIgnore] @@ -26,10 +29,16 @@ public virtual string? Id set => ReferenceNumber = value; } + [ChangeSetIgnore] public string _Etag { get; set; } = default!; [Attr] public DateTime? CreatedSource { get; set; } - [Attr] public DateTime Created { get; set; } - [Attr] public DateTime Updated { get; set; } + + [Attr] + [ChangeSetIgnore] public DateTime Created { get; set; } + + [Attr] + [ChangeSetIgnore] + public DateTime Updated { get; set; } [BsonIgnore] [NotMapped] @@ -63,15 +72,10 @@ public string? StringId // They are removed from the document that is sent to the client by the JsonApiResourceDefinition OnApplySparseFieldSet // mechanism - /// - /// Tracks the last time the record was changed - /// - [Attr] - [BsonElement("_ts")] - public DateTime _Ts { get; set; } [Attr] [BsonElement("_pointOfEntry")] + [ChangeSetIgnore] public string _PointOfEntry { get => PartOne?.PointOfEntry!; @@ -86,6 +90,7 @@ public string _PointOfEntry [Attr] [BsonElement("_pointOfEntryControlPoint")] + [ChangeSetIgnore] public string _PointOfEntryControlPoint { get => PartOne?.PointOfEntryControlPoint!; @@ -99,6 +104,7 @@ public string _PointOfEntryControlPoint } [BsonElement("_matchReferences")] + [ChangeSetIgnore] public string _MatchReference { get @@ -140,7 +146,6 @@ public void AddRelationship(TdmRelationshipObject relationship) public void Changed(AuditEntry auditEntry) { this.AuditEntries.Add(auditEntry); - _Ts = DateTime.UtcNow; } public void Create(string auditId) @@ -162,13 +167,17 @@ public void Skipped(string auditId, int version) this.Changed(auditEntry); } - public void Update(string auditId, ImportNotification previous) + public void Update(string auditId, ChangeSet changeSet) { - var auditEntry = AuditEntry.CreateUpdated(previous, - this, + var auditEntry = AuditEntry.CreateUpdated(changeSet, auditId, this.Version.GetValueOrDefault(), this.UpdatedSource); this.Changed(auditEntry); } + + public AuditEntry GetLatestAuditEntry() + { + return this.AuditEntries.OrderByDescending(x => x.CreatedLocal).First(); + } } \ No newline at end of file diff --git a/Btms.Model/Movement.cs b/Btms.Model/Movement.cs index 6a0958ce..26e7bc3c 100644 --- a/Btms.Model/Movement.cs +++ b/Btms.Model/Movement.cs @@ -6,6 +6,7 @@ using System.Text.Json.Serialization; using Btms.Model.Alvs; using Btms.Model.Auditing; +using Btms.Model.ChangeLog; using Btms.Model.Data; using Btms.Model.Relationships; @@ -15,11 +16,12 @@ namespace Btms.Model; // https://eaflood.atlassian.net/wiki/spaces/TRADE/pages/5104664583/PHA+Port+Health+Authority+Integration+Data+Schema [Resource] -public class Movement : IMongoIdentifiable, IDataEntity +public class Movement : IMongoIdentifiable, IDataEntity, IAuditable { private List matchReferences = []; // This field is used by the jsonapi-consumer to control the correct casing in the type field + [ChangeSetIgnore] public string Type { get; set; } = "movements"; [Attr] public List ClearanceRequests { get; set; } = default!; @@ -52,7 +54,9 @@ public class Movement : IMongoIdentifiable, IDataEntity [Attr] public string GoodsLocationCode { get; set; } = default!; - [Attr] public List AuditEntries { get; set; } = new List(); + [Attr] + [ChangeSetIgnore] + public List AuditEntries { get; set; } = new List(); [Attr] [JsonPropertyName("relationships")] @@ -60,6 +64,7 @@ public class Movement : IMongoIdentifiable, IDataEntity [BsonElement("_matchReferences")] + [ChangeSetIgnore] public List _MatchReferences { get @@ -156,6 +161,7 @@ private static string BuildNormalizedDecisionPath(string fullPath) [BsonIgnore] [NotMapped] + [ChangeSetIgnore] public string? StringId { get => Id; @@ -171,7 +177,18 @@ public string? StringId [BsonId] public string? Id { get; set; } = null!; + [ChangeSetIgnore] public string _Etag { get; set; } = null!; - [Attr] public DateTime Created { get; set; } - [Attr] public DateTime Updated { get; set; } + + [Attr] + [ChangeSetIgnore] + public DateTime Created { get; set; } + + [Attr] + [ChangeSetIgnore] + public DateTime Updated { get; set; } + public AuditEntry GetLatestAuditEntry() + { + return this.AuditEntries.OrderByDescending(x => x.CreatedLocal).First(); + } } \ No newline at end of file diff --git a/Btms.SensitiveData/SensitiveDataAttribute.cs b/Btms.SensitiveData/SensitiveDataAttribute.cs index e3bb3d07..8bde1ce1 100644 --- a/Btms.SensitiveData/SensitiveDataAttribute.cs +++ b/Btms.SensitiveData/SensitiveDataAttribute.cs @@ -1,5 +1,6 @@ namespace Btms.SensitiveData; +[AttributeUsage(AttributeTargets.Property)] public class SensitiveDataAttribute : Attribute {