From 464dee137856f9c5c8db688f2edfba5fca62adc9 Mon Sep 17 00:00:00 2001 From: Lorenzo Di Giuseppe Date: Mon, 27 Feb 2023 09:19:31 +0100 Subject: [PATCH 01/13] replicate Activiti optimistic concurrency exception --- .../services/audit/GatewayConcurrencyIT.java | 149 ++++++++++++++++++ .../test/resources/access-control.properties | 2 +- .../resources/application-test.properties | 7 + .../connectors/generate-signal-connector.json | 33 ++++ ...s-with-gateway-concurrency-extensions.json | 32 ++++ ...rocess-with-gateway-concurrency.bpmn20.xml | 86 ++++++++++ 6 files changed, 308 insertions(+), 1 deletion(-) create mode 100644 activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java create mode 100644 activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/connectors/generate-signal-connector.json create mode 100644 activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/process-with-gateway-concurrency-extensions.json create mode 100644 activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/process-with-gateway-concurrency.bpmn20.xml diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java new file mode 100644 index 00000000000..cbfa90936a6 --- /dev/null +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java @@ -0,0 +1,149 @@ +/* + * Copyright 2017-2020 Alfresco Software, Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.activiti.cloud.starter.tests.services.audit; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.awaitility.Awaitility.await; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.activiti.api.process.model.ProcessInstance.ProcessInstanceStatus; +import org.activiti.cloud.api.process.model.CloudProcessInstance; +import org.activiti.cloud.api.process.model.IntegrationRequest; +import org.activiti.cloud.api.process.model.IntegrationResult; +import org.activiti.cloud.api.process.model.impl.IntegrationRequestImpl; +import org.activiti.cloud.api.process.model.impl.IntegrationResultImpl; +import org.activiti.cloud.services.test.containers.KeycloakContainerApplicationInitializer; +import org.activiti.cloud.services.test.identity.IdentityTokenProducer; +import org.activiti.cloud.starter.tests.helper.HelperConfiguration; +import org.activiti.cloud.starter.tests.helper.ProcessInstanceRestTemplate; +import org.activiti.cloud.starter.tests.runtime.IntegrationResultSender; +import org.activiti.cloud.starter.tests.runtime.ServiceTaskConsumerHandler; +import org.activiti.engine.RuntimeService; +import org.activiti.services.connectors.channel.ServiceTaskIntegrationResultEventHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.binder.test.OutputDestination; +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; +import org.springframework.context.annotation.Import; +import org.springframework.http.ResponseEntity; +import org.springframework.messaging.Message; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; + +@ActiveProfiles(AuditProducerIT.AUDIT_PRODUCER_IT) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@TestPropertySource("classpath:application-test.properties") +@DirtiesContext +@Import({HelperConfiguration.class, + ServiceTaskConsumerHandler.class, + IntegrationResultSender.class, + TestChannelBinderConfiguration.class}) +@ContextConfiguration( + initializers = {KeycloakContainerApplicationInitializer.class}) +public class GatewayConcurrencyIT { + + private static final String PROCESS_ID = "gateway_concurrency"; + + private static final String SIGNAL_NAME = "concurrentSignal"; + + @Autowired + private ProcessInstanceRestTemplate processInstanceRestTemplate; + + @Autowired + private IdentityTokenProducer identityTokenProducer; + + @Autowired + private ServiceTaskIntegrationResultEventHandler serviceTaskIntegrationResultEventHandler; + + @Autowired + private OutputDestination outputDestination; + + @Autowired + private RuntimeService runtimeService; + + @Autowired + private ObjectMapper objectMapper; + + private ExecutorService executorService; + + + @BeforeEach + public void setUp() { + identityTokenProducer.withTestUser("testuser"); + executorService = Executors.newFixedThreadPool(2); + } + + @AfterEach + public void cleanUp(){ + executorService.shutdown(); + } + + @Test + public void shouldExecuteWithoutConcurrencyException() throws IOException, InterruptedException { + ResponseEntity processInstance = processInstanceRestTemplate.startProcess(PROCESS_ID, + Map.of("signal", SIGNAL_NAME),null); + final String processInstanceId = processInstance.getBody().getId(); + + IntegrationRequest integrationRequest = getIntegrationRequest(); + + final IntegrationResult integrationResult = createIntegrationResult(integrationRequest); + + List> tasks = new ArrayList<>(); + + tasks.add(() -> { + serviceTaskIntegrationResultEventHandler.receive(integrationResult); + return null; + }); + + tasks.add(() -> { + runtimeService.signalEventReceived(SIGNAL_NAME); + return null; + }); + + executorService.invokeAll(tasks); + + + await().atMost(Duration.ofMinutes(10)).untilAsserted(() -> { + ResponseEntity completedProcessInstance = processInstanceRestTemplate.getProcessInstance(processInstanceId); + + assertThat(completedProcessInstance.getBody().getStatus()).isEqualTo(ProcessInstanceStatus.COMPLETED); + }); + + } + + private IntegrationRequest getIntegrationRequest() throws IOException { + Message message = outputDestination.receive(10000, "generate-signal-connector.GENERATE"); + assertThat(message).isNotNull(); + return objectMapper.readValue(message.getPayload(), IntegrationRequestImpl.class); + } + + private IntegrationResult createIntegrationResult(IntegrationRequest integrationRequest) { + return new IntegrationResultImpl(integrationRequest, integrationRequest.getIntegrationContext()); + } +} diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/access-control.properties b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/access-control.properties index 9aaf6a03afe..449d61078f0 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/access-control.properties +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/access-control.properties @@ -1,7 +1,7 @@ activiti.security.policies[0].name=MyPolicy for TestUser activiti.security.policies[0].users=testuser activiti.security.policies[0].access=READ -activiti.security.policies[0].keys=process_pool1,ProcessWithVariables +activiti.security.policies[0].keys=process_pool1,ProcessWithVariables,gateway_concurrency_test activiti.security.policies[0].serviceName=test-app activiti.security.policies[1].name=MyPolicy for TestAdmin diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/application-test.properties b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/application-test.properties index f47d31ca141..1683442830b 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/application-test.properties +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/application-test.properties @@ -38,6 +38,10 @@ spring.cloud.stream.bindings.constantsIntegrationEventsConsumer.destination=Cons spring.cloud.stream.bindings.constantsIntegrationEventsConsumer.group=integration spring.cloud.stream.bindings.constantsIntegrationEventsConsumer.contentType=application/json +# generate signal connector +spring.cloud.stream.bindings.generateSignalConsumer.destination=generate-signal-connector.GENERATE +spring.cloud.stream.bindings.generateSignalConsumer.group=integration +spring.cloud.stream.bindings.generateSignalConsumer.contentType=application/json # multi-instance result collection connector spring.cloud.stream.bindings.mealsConnectorConsumer.destination=mealsConnector @@ -78,3 +82,6 @@ activiti.cloud.messaging.partition-count=1 # Test partitioned consumer properties spring.cloud.stream.bindings.queryConsumer.consumer.partitioned=${activiti.cloud.messaging.partitioned} spring.cloud.stream.bindings.auditConsumer.consumer.partitioned=${activiti.cloud.messaging.partitioned} + +logging.level.org.activiti.engine.impl.db=trace +logging.level.org.activiti.engine.integration=trace \ No newline at end of file diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/connectors/generate-signal-connector.json b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/connectors/generate-signal-connector.json new file mode 100644 index 00000000000..5e255326a9b --- /dev/null +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/connectors/generate-signal-connector.json @@ -0,0 +1,33 @@ +{ + "name": "generate-signal-connector", + "description": "", + "actions": { + "generate-signal": { + "id": "generate-signal", + "name": "GENERATE", + "description": "Generate Signal", + "inputs": [ + { + "id": "input-variable-1", + "name": "signal", + "description": "", + "type": "string", + "model": { + "$ref": "#/$defs/primitive/string" + } + } + ], + "outputs": [ + { + "id": "output-variable-1", + "name": "status", + "description": "", + "type": "boolean", + "model": { + "$ref": "#/$defs/primitive/boolean" + } + } + ] + } + } +} \ No newline at end of file diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/process-with-gateway-concurrency-extensions.json b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/process-with-gateway-concurrency-extensions.json new file mode 100644 index 00000000000..c4c4f7677d3 --- /dev/null +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/process-with-gateway-concurrency-extensions.json @@ -0,0 +1,32 @@ +{ + "gateway_concurrency": { + "constants": {}, + "mappings": { + "serviceTask1": { + "inputs": { + "signal": { + "type": "variable", + "value": "signal" + } + } + } + }, + "properties": { + "variable1": { + "id": "variable1", + "name": "signal", + "type": "string", + "analytics": false, + "required": false, + "model": { + "$ref": "#/$defs/primitive/string" + } + } + }, + "assignments": {}, + "templates": { + "tasks": {}, + "default": {} + } + } +} \ No newline at end of file diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/process-with-gateway-concurrency.bpmn20.xml b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/process-with-gateway-concurrency.bpmn20.xml new file mode 100644 index 00000000000..44b6aee0f36 --- /dev/null +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/process-with-gateway-concurrency.bpmn20.xml @@ -0,0 +1,86 @@ + + + + + + flow1 + + + flow1 + flow2 + flow3 + + + + flow2 + flow4 + + + + + flow3 + flow5 + + + + flow4 + flow5 + flow6 + + + + + flow6 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + From 6c1cf809fe9f7b0117f83386aa980af1abcdd381 Mon Sep 17 00:00:00 2001 From: Lorenzo Di Giuseppe Date: Mon, 27 Feb 2023 12:59:31 +0100 Subject: [PATCH 02/13] fix IT assertions --- .../services/audit/GatewayConcurrencyIT.java | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java index cbfa90936a6..f77b1529e2c 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java @@ -15,19 +15,22 @@ */ package org.activiti.cloud.starter.tests.services.audit; +import static org.activiti.api.process.model.events.ProcessRuntimeEvent.ProcessEvents.PROCESS_COMPLETED; +import static org.activiti.api.process.model.events.ProcessRuntimeEvent.ProcessEvents.PROCESS_CREATED; +import static org.activiti.api.process.model.events.ProcessRuntimeEvent.ProcessEvents.PROCESS_STARTED; +import static org.assertj.core.api.Assertions.tuple; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.awaitility.Awaitility.await; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import org.activiti.api.process.model.ProcessInstance.ProcessInstanceStatus; +import org.activiti.cloud.api.model.shared.events.CloudRuntimeEvent; import org.activiti.cloud.api.process.model.CloudProcessInstance; import org.activiti.cloud.api.process.model.IntegrationRequest; import org.activiti.cloud.api.process.model.IntegrationResult; @@ -35,12 +38,10 @@ import org.activiti.cloud.api.process.model.impl.IntegrationResultImpl; import org.activiti.cloud.services.test.containers.KeycloakContainerApplicationInitializer; import org.activiti.cloud.services.test.identity.IdentityTokenProducer; -import org.activiti.cloud.starter.tests.helper.HelperConfiguration; import org.activiti.cloud.starter.tests.helper.ProcessInstanceRestTemplate; -import org.activiti.cloud.starter.tests.runtime.IntegrationResultSender; -import org.activiti.cloud.starter.tests.runtime.ServiceTaskConsumerHandler; import org.activiti.engine.RuntimeService; import org.activiti.services.connectors.channel.ServiceTaskIntegrationResultEventHandler; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -60,11 +61,8 @@ @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @TestPropertySource("classpath:application-test.properties") @DirtiesContext -@Import({HelperConfiguration.class, - ServiceTaskConsumerHandler.class, - IntegrationResultSender.class, - TestChannelBinderConfiguration.class}) -@ContextConfiguration( +@Import({TestChannelBinderConfiguration.class}) +@ContextConfiguration(classes = ServicesAuditITConfiguration.class, initializers = {KeycloakContainerApplicationInitializer.class}) public class GatewayConcurrencyIT { @@ -90,13 +88,16 @@ public class GatewayConcurrencyIT { @Autowired private ObjectMapper objectMapper; - private ExecutorService executorService; + @Autowired + private AuditConsumerStreamHandler streamHandler; + private ExecutorService executorService; @BeforeEach public void setUp() { identityTokenProducer.withTestUser("testuser"); executorService = Executors.newFixedThreadPool(2); + streamHandler.clear(); } @AfterEach @@ -129,10 +130,22 @@ public void shouldExecuteWithoutConcurrencyException() throws IOException, Inter executorService.invokeAll(tasks); - await().atMost(Duration.ofMinutes(10)).untilAsserted(() -> { - ResponseEntity completedProcessInstance = processInstanceRestTemplate.getProcessInstance(processInstanceId); - - assertThat(completedProcessInstance.getBody().getStatus()).isEqualTo(ProcessInstanceStatus.COMPLETED); + await().untilAsserted(() -> { + List> receivedEvents = streamHandler.getAllReceivedEvents(); + + Assertions.assertThat(receivedEvents) + .extracting(CloudRuntimeEvent::getEventType, + CloudRuntimeEvent::getProcessInstanceId, + CloudRuntimeEvent::getEntityId) + .contains(tuple(PROCESS_CREATED, + processInstanceId, + processInstanceId), + tuple(PROCESS_STARTED, + processInstanceId, + processInstanceId), + tuple(PROCESS_COMPLETED, + processInstanceId, + processInstanceId)); }); } From 19d831fc97499ba0ded17c46b00549824ee64b15 Mon Sep 17 00:00:00 2001 From: Igor Dianov Date: Tue, 28 Feb 2023 13:08:20 -0800 Subject: [PATCH 03/13] support integration event handler concurrent execution support with retry --- activiti-cloud-api/pom.xml | 2 +- activiti-cloud-modeling-service/pom.xml | 2 +- activiti-cloud-query-service/pom.xml | 2 +- ...rviceTaskIntegrationErrorEventHandler.java | 31 +++++++------ ...viceTaskIntegrationResultEventHandler.java | 34 ++++++++------- .../CloudConnectorsAutoConfiguration.java | 2 + .../services/audit/GatewayConcurrencyIT.java | 43 ++++++++++++------- .../resources/application-test.properties | 3 +- activiti-cloud-runtime-bundle-service/pom.xml | 2 +- activiti-cloud-service-common/pom.xml | 2 +- 10 files changed, 70 insertions(+), 53 deletions(-) diff --git a/activiti-cloud-api/pom.xml b/activiti-cloud-api/pom.xml index 9c9045ff2ae..30170900b7b 100644 --- a/activiti-cloud-api/pom.xml +++ b/activiti-cloud-api/pom.xml @@ -12,7 +12,7 @@ Activiti Cloud :: Runtime API Parent pom - 7.10.0-alpha.1 + 0.0.1-PR-4264-1226-SNAPSHOT activiti-cloud-api-dependencies diff --git a/activiti-cloud-modeling-service/pom.xml b/activiti-cloud-modeling-service/pom.xml index 737fc1b28c0..0ba7242e5f2 100644 --- a/activiti-cloud-modeling-service/pom.xml +++ b/activiti-cloud-modeling-service/pom.xml @@ -19,7 +19,7 @@ activiti-cloud-acceptance-tests-modeling - 7.10.0-alpha.1 + 0.0.1-PR-4264-1226-SNAPSHOT 1.14.0 4.4 20230227 diff --git a/activiti-cloud-query-service/pom.xml b/activiti-cloud-query-service/pom.xml index 142870a9e07..f8f406b374a 100644 --- a/activiti-cloud-query-service/pom.xml +++ b/activiti-cloud-query-service/pom.xml @@ -18,7 +18,7 @@ activiti-cloud-starter-query - 7.10.0-alpha.1 + 0.0.1-PR-4264-1226-SNAPSHOT diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java index 305f05ba5ad..fc2ae55abd2 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java @@ -16,6 +16,8 @@ package org.activiti.services.connectors.channel; +import java.util.ArrayList; +import java.util.List; import org.activiti.api.process.model.IntegrationContext; import org.activiti.cloud.api.process.model.CloudBpmnError; import org.activiti.cloud.api.process.model.IntegrationError; @@ -23,14 +25,16 @@ import org.activiti.cloud.services.events.listeners.ProcessEngineEventsAggregator; import org.activiti.engine.ManagementService; import org.activiti.engine.RuntimeService; +import org.activiti.engine.impl.cmd.integration.DeleteIntegrationContextCmd; +import org.activiti.engine.impl.interceptor.Command; import org.activiti.engine.impl.persistence.entity.ExecutionEntity; import org.activiti.engine.impl.persistence.entity.integration.IntegrationContextEntity; import org.activiti.engine.integration.IntegrationContextService; import org.activiti.engine.runtime.Execution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.util.List; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; public class ServiceTaskIntegrationErrorEventHandler { @@ -54,12 +58,15 @@ public ServiceTaskIntegrationErrorEventHandler(RuntimeService runtimeService, this.processEngineEventsAggregator = processEngineEventsAggregator; } + @Retryable(backoff = @Backoff(delayExpression = "${activiti.cloud.integration.error.retry.backoff.delay:10}")) public void receive(IntegrationError integrationError) { IntegrationContext integrationContext = integrationError.getIntegrationContext(); IntegrationContextEntity integrationContextEntity = integrationContextService.findById(integrationContext.getId()); if (integrationContextEntity != null) { - integrationContextService.deleteIntegrationContext(integrationContextEntity); + List> commands = new ArrayList<>(); + + commands.add(new DeleteIntegrationContextCmd(integrationContextEntity)); List executions = runtimeService.createExecutionQuery().executionId(integrationContextEntity.getExecutionId()).list(); if (executions.size() > 0) { @@ -77,8 +84,7 @@ public void receive(IntegrationError integrationError) { if (CloudBpmnError.class.getName().equals(errorClassName)) { if (execution.getActivityId().equals(clientId)) { try { - triggerIntegrationContextError(integrationError, execution); - return; + commands.add(new PropagateCloudBpmnErrorCmd(integrationError, execution)); } catch (Throwable cause) { LOGGER.error("Error propagating CloudBpmnError: {}", cause.getMessage()); } @@ -97,17 +103,10 @@ public void receive(IntegrationError integrationError) { LOGGER.warn(message); } - managementService.executeCommand(new AggregateIntegrationErrorReceivedEventCmd( - integrationError, runtimeBundleProperties, processEngineEventsAggregator)); - } - } + commands.add(new AggregateIntegrationErrorReceivedEventCmd( + integrationError, runtimeBundleProperties, processEngineEventsAggregator)); - private void triggerIntegrationContextError(IntegrationError integrationError, ExecutionEntity execution) { - managementService.executeCommand( - CompositeCommand.of( - new PropagateCloudBpmnErrorCmd(integrationError, execution), - new AggregateIntegrationErrorReceivedClosingEventCmd(new AggregateIntegrationErrorReceivedEventCmd( - integrationError, runtimeBundleProperties, processEngineEventsAggregator)))); + managementService.executeCommand(CompositeCommand.of(commands.toArray(Command[]::new))); + } } - } diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java index 0b1d5a33a06..0a847924fff 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java @@ -16,6 +16,7 @@ package org.activiti.services.connectors.channel; +import java.util.ArrayList; import java.util.List; import org.activiti.api.process.model.IntegrationContext; import org.activiti.cloud.api.process.model.IntegrationResult; @@ -25,11 +26,15 @@ import org.activiti.engine.RuntimeService; import org.activiti.engine.impl.bpmn.behavior.VariablesPropagator; import org.activiti.engine.impl.cmd.TriggerCmd; +import org.activiti.engine.impl.cmd.integration.DeleteIntegrationContextCmd; +import org.activiti.engine.impl.interceptor.Command; import org.activiti.engine.impl.persistence.entity.integration.IntegrationContextEntity; import org.activiti.engine.integration.IntegrationContextService; import org.activiti.engine.runtime.Execution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; public class ServiceTaskIntegrationResultEventHandler { @@ -56,6 +61,7 @@ public ServiceTaskIntegrationResultEventHandler(RuntimeService runtimeService, this.variablesPropagator = variablesPropagator; } + @Retryable(backoff = @Backoff(delayExpression = "${activiti.cloud.integration.result.retry.backoff.delay:10}")) public void receive(IntegrationResult integrationResult) { IntegrationContext integrationContext = integrationResult.getIntegrationContext(); IntegrationContextEntity integrationContextEntity = integrationContextService.findById(integrationContext.getId()); @@ -64,16 +70,20 @@ public void receive(IntegrationResult integrationResult) { List executions = runtimeService.createExecutionQuery() .executionId(executionId) .list(); + if (integrationContextEntity != null) { - integrationContextService.deleteIntegrationContext(integrationContextEntity); + List> commands = new ArrayList<>(); + + commands.add(new DeleteIntegrationContextCmd(integrationContextEntity)); if (executions.size() > 0) { Execution execution = executions.get(0); if (execution.getActivityId() .equals(integrationContext.getClientId())) { - triggerIntegrationContextExecution(integrationContext); - return; + commands.add(new TriggerCmd(integrationContext.getExecutionId(), + integrationContext.getOutBoundVariables(), + variablesPropagator)); } else { LOGGER.warn("Could not find matching activityId '{}' for integration result '{}' with executionId '{}'", integrationContext.getClientId(), @@ -87,19 +97,11 @@ public void receive(IntegrationResult integrationResult) { "`. The integration result for the integration context `" + integrationContext.getId() + "` will be ignored."; LOGGER.warn(message); } - managementService.executeCommand(new AggregateIntegrationResultReceivedEventCmd( - integrationContext, runtimeBundleProperties, processEngineEventsAggregator)); - } - } - private void triggerIntegrationContextExecution(IntegrationContext integrationContext) { - managementService.executeCommand( - CompositeCommand.of( - new TriggerCmd(integrationContext.getExecutionId(), integrationContext.getOutBoundVariables(), - variablesPropagator), - new AggregateIntegrationResultReceivedEventCmd(integrationContext, - runtimeBundleProperties, processEngineEventsAggregator) - )); - } + commands.add(new AggregateIntegrationResultReceivedEventCmd( + integrationContext, runtimeBundleProperties, processEngineEventsAggregator)); + managementService.executeCommand(CompositeCommand.of(commands.toArray(Command[]::new))); + } + } } diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java index f5fccd676c4..9c5873b2bea 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java @@ -50,10 +50,12 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.messaging.Message; +import org.springframework.retry.annotation.EnableRetry; @Configuration @AutoConfigureBefore(value = ConnectorsAutoConfiguration.class) @PropertySource("classpath:config/integration-result-stream.properties") +@EnableRetry public class CloudConnectorsAutoConfiguration { private static final String LOCAL_SERVICE_TASK_BEHAVIOUR_BEAN_NAME = "localServiceTaskBehaviour"; diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java index f77b1529e2c..a9c3e739a88 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java @@ -15,21 +15,16 @@ */ package org.activiti.cloud.starter.tests.services.audit; -import static org.activiti.api.process.model.events.ProcessRuntimeEvent.ProcessEvents.PROCESS_COMPLETED; -import static org.activiti.api.process.model.events.ProcessRuntimeEvent.ProcessEvents.PROCESS_CREATED; -import static org.activiti.api.process.model.events.ProcessRuntimeEvent.ProcessEvents.PROCESS_STARTED; -import static org.assertj.core.api.Assertions.tuple; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.awaitility.Awaitility.await; - -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.activiti.api.process.model.payloads.SignalPayload; import org.activiti.cloud.api.model.shared.events.CloudRuntimeEvent; import org.activiti.cloud.api.process.model.CloudProcessInstance; import org.activiti.cloud.api.process.model.IntegrationRequest; @@ -39,24 +34,33 @@ import org.activiti.cloud.services.test.containers.KeycloakContainerApplicationInitializer; import org.activiti.cloud.services.test.identity.IdentityTokenProducer; import org.activiti.cloud.starter.tests.helper.ProcessInstanceRestTemplate; -import org.activiti.engine.RuntimeService; -import org.activiti.services.connectors.channel.ServiceTaskIntegrationResultEventHandler; +import org.activiti.engine.impl.bpmn.behavior.InclusiveGatewayActivityBehavior; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.binder.test.InputDestination; import org.springframework.cloud.stream.binder.test.OutputDestination; import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; +import org.springframework.cloud.stream.config.BindingServiceProperties; import org.springframework.context.annotation.Import; import org.springframework.http.ResponseEntity; import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.TestPropertySource; +import static org.activiti.api.process.model.events.ProcessRuntimeEvent.ProcessEvents.PROCESS_COMPLETED; +import static org.activiti.api.process.model.events.ProcessRuntimeEvent.ProcessEvents.PROCESS_CREATED; +import static org.activiti.api.process.model.events.ProcessRuntimeEvent.ProcessEvents.PROCESS_STARTED; +import static org.assertj.core.api.Assertions.tuple; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.awaitility.Awaitility.await; + @ActiveProfiles(AuditProducerIT.AUDIT_PRODUCER_IT) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @TestPropertySource("classpath:application-test.properties") @@ -77,13 +81,13 @@ public class GatewayConcurrencyIT { private IdentityTokenProducer identityTokenProducer; @Autowired - private ServiceTaskIntegrationResultEventHandler serviceTaskIntegrationResultEventHandler; + private BindingServiceProperties bindingServiceProperties; @Autowired private OutputDestination outputDestination; @Autowired - private RuntimeService runtimeService; + private InputDestination inputDestination; @Autowired private ObjectMapper objectMapper; @@ -100,6 +104,8 @@ public void setUp() { streamHandler.clear(); } + InclusiveGatewayActivityBehavior s; + @AfterEach public void cleanUp(){ executorService.shutdown(); @@ -118,18 +124,25 @@ public void shouldExecuteWithoutConcurrencyException() throws IOException, Inter List> tasks = new ArrayList<>(); tasks.add(() -> { - serviceTaskIntegrationResultEventHandler.receive(integrationResult); + Message message = MessageBuilder.withPayload(new SignalPayload(SIGNAL_NAME, Collections.emptyMap())).build(); + String destination = bindingServiceProperties.getBindingDestination("signalConsumer"); + + inputDestination.send(message, + destination); return null; }); tasks.add(() -> { - runtimeService.signalEventReceived(SIGNAL_NAME); + Message message = MessageBuilder.withPayload(integrationResult).build(); + String destination = bindingServiceProperties.getBindingDestination("integrationResultsConsumer"); + + inputDestination.send(message, + destination); return null; }); executorService.invokeAll(tasks); - await().untilAsserted(() -> { List> receivedEvents = streamHandler.getAllReceivedEvents(); diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/application-test.properties b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/application-test.properties index 1683442830b..2476accfa34 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/application-test.properties +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/application-test.properties @@ -84,4 +84,5 @@ spring.cloud.stream.bindings.queryConsumer.consumer.partitioned=${activiti.cloud spring.cloud.stream.bindings.auditConsumer.consumer.partitioned=${activiti.cloud.messaging.partitioned} logging.level.org.activiti.engine.impl.db=trace -logging.level.org.activiti.engine.integration=trace \ No newline at end of file +logging.level.org.activiti.engine.integration=trace +logging.level.org.springframework.retry=debug diff --git a/activiti-cloud-runtime-bundle-service/pom.xml b/activiti-cloud-runtime-bundle-service/pom.xml index 3d275b45eed..44323192ba3 100644 --- a/activiti-cloud-runtime-bundle-service/pom.xml +++ b/activiti-cloud-runtime-bundle-service/pom.xml @@ -18,7 +18,7 @@ activiti-cloud-starter-runtime-bundle-it - 7.10.0-alpha.1 + 0.0.1-PR-4264-1226-SNAPSHOT diff --git a/activiti-cloud-service-common/pom.xml b/activiti-cloud-service-common/pom.xml index 8650e446e50..1646cb346da 100644 --- a/activiti-cloud-service-common/pom.xml +++ b/activiti-cloud-service-common/pom.xml @@ -34,7 +34,7 @@ activiti-cloud-service-messaging-starter - 7.10.0-alpha.1 + 0.0.1-PR-4264-1226-SNAPSHOT 1.9.4 1.10 2.6 From debd5e10c5b27912db5669001394e1ca28e2d75b Mon Sep 17 00:00:00 2001 From: Igor Dianov Date: Tue, 28 Feb 2023 13:15:51 -0800 Subject: [PATCH 04/13] fix missing end of file --- .../test/resources/connectors/generate-signal-connector.json | 2 +- .../processes/process-with-gateway-concurrency-extensions.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/connectors/generate-signal-connector.json b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/connectors/generate-signal-connector.json index 5e255326a9b..149f7b26740 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/connectors/generate-signal-connector.json +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/connectors/generate-signal-connector.json @@ -30,4 +30,4 @@ ] } } -} \ No newline at end of file +} diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/process-with-gateway-concurrency-extensions.json b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/process-with-gateway-concurrency-extensions.json index c4c4f7677d3..ef76560dd27 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/process-with-gateway-concurrency-extensions.json +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/process-with-gateway-concurrency-extensions.json @@ -29,4 +29,4 @@ "default": {} } } -} \ No newline at end of file +} From 1f8426d323da1b33ccc7a462cd6c243c2d6615b9 Mon Sep 17 00:00:00 2001 From: Igor Dianov Date: Tue, 28 Feb 2023 13:56:07 -0800 Subject: [PATCH 05/13] refactor integration result connector tests --- ...rviceTaskIntegrationErrorEventHandler.java | 12 +++++-- ...eTaskIntegrationErrorEventHandlerTest.java | 34 +++++++++---------- ...TaskIntegrationResultEventHandlerTest.java | 27 ++++++++------- 3 files changed, 41 insertions(+), 32 deletions(-) diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java index fc2ae55abd2..9d1772718c4 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java @@ -85,6 +85,13 @@ public void receive(IntegrationError integrationError) { if (execution.getActivityId().equals(clientId)) { try { commands.add(new PropagateCloudBpmnErrorCmd(integrationError, execution)); + commands.add(new AggregateIntegrationErrorReceivedClosingEventCmd( + new AggregateIntegrationErrorReceivedEventCmd(integrationError, + runtimeBundleProperties, + processEngineEventsAggregator))); + + managementService.executeCommand(CompositeCommand.of(commands.toArray(Command[]::new))); + return; } catch (Throwable cause) { LOGGER.error("Error propagating CloudBpmnError: {}", cause.getMessage()); } @@ -103,8 +110,9 @@ public void receive(IntegrationError integrationError) { LOGGER.warn(message); } - commands.add(new AggregateIntegrationErrorReceivedEventCmd( - integrationError, runtimeBundleProperties, processEngineEventsAggregator)); + commands.add(new AggregateIntegrationErrorReceivedEventCmd(integrationError, + runtimeBundleProperties, + processEngineEventsAggregator)); managementService.executeCommand(CompositeCommand.of(commands.toArray(Command[]::new))); } diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/test/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandlerTest.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/test/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandlerTest.java index 16efe47ffcd..e58de0e0cc0 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/test/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandlerTest.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/test/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandlerTest.java @@ -15,11 +15,7 @@ */ package org.activiti.services.connectors.channel; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import java.util.Collections; import org.activiti.api.runtime.model.impl.IntegrationContextImpl; import org.activiti.cloud.api.process.model.CloudBpmnError; import org.activiti.cloud.api.process.model.IntegrationError; @@ -27,7 +23,7 @@ import org.activiti.cloud.api.process.model.impl.IntegrationRequestImpl; import org.activiti.engine.ManagementService; import org.activiti.engine.RuntimeService; -import org.activiti.engine.impl.interceptor.Command; +import org.activiti.engine.impl.cmd.integration.DeleteIntegrationContextCmd; import org.activiti.engine.impl.persistence.entity.ExecutionEntity; import org.activiti.engine.impl.persistence.entity.integration.IntegrationContextEntityImpl; import org.activiti.engine.integration.IntegrationContextService; @@ -41,7 +37,11 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import java.util.Collections; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class ServiceTaskIntegrationErrorEventHandlerTest { @@ -60,7 +60,7 @@ public class ServiceTaskIntegrationErrorEventHandlerTest { private IntegrationContextService integrationContextService; @Captor - private ArgumentCaptor> commandArgumentCaptor; + private ArgumentCaptor commandArgumentCaptor; @Mock private ExecutionQuery executionQuery; @@ -96,13 +96,12 @@ public void should_propagateErrorAndAggregateEvent_when_clientIdMatches() { handler.receive(integrationErrorEvent); //then - verify(integrationContextService).deleteIntegrationContext(integrationContextEntity); verify(managementService).executeCommand(commandArgumentCaptor.capture()); - final Command command = commandArgumentCaptor.getValue(); - assertThat(command).isExactlyInstanceOf(CompositeCommand.class); - CompositeCommand compositeCommand = (CompositeCommand) command; - assertThat(compositeCommand.getCommands().get(0)).isInstanceOf(PropagateCloudBpmnErrorCmd.class); - assertThat(compositeCommand.getCommands().get(1)).isInstanceOf(AggregateIntegrationErrorReceivedClosingEventCmd.class); + final CompositeCommand compositeCommand = commandArgumentCaptor.getValue(); + assertThat(compositeCommand.getCommands()).hasSize(3); + assertThat(compositeCommand.getCommands().get(0)).isInstanceOf(DeleteIntegrationContextCmd.class); + assertThat(compositeCommand.getCommands().get(1)).isInstanceOf(PropagateCloudBpmnErrorCmd.class); + assertThat(compositeCommand.getCommands().get(2)).isInstanceOf(AggregateIntegrationErrorReceivedClosingEventCmd.class); } @Test @@ -126,10 +125,11 @@ public void should_AggregateEventButNotPropagateError_when_clientIdDoesNotMatch( handler.receive(integrationErrorEvent); //then - verify(integrationContextService).deleteIntegrationContext(integrationContextEntity); verify(managementService).executeCommand(commandArgumentCaptor.capture()); - final Command command = commandArgumentCaptor.getValue(); - assertThat(command).isExactlyInstanceOf(AggregateIntegrationErrorReceivedEventCmd.class); + final CompositeCommand compositeCommand = commandArgumentCaptor.getValue(); + assertThat(compositeCommand.getCommands()).hasSize(2); + assertThat(compositeCommand.getCommands().get(0)).isInstanceOf(DeleteIntegrationContextCmd.class); + assertThat(compositeCommand.getCommands().get(1)).isInstanceOf(AggregateIntegrationErrorReceivedEventCmd.class); } private IntegrationContextEntityImpl buildIntegrationContextEntity() { diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/test/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandlerTest.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/test/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandlerTest.java index 6239b8544f5..39ed23851ed 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/test/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandlerTest.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/test/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandlerTest.java @@ -15,14 +15,6 @@ */ package org.activiti.services.connectors.channel; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import java.util.Collections; import java.util.List; import java.util.Map; @@ -33,6 +25,7 @@ import org.activiti.engine.ManagementService; import org.activiti.engine.RuntimeService; import org.activiti.engine.impl.cmd.TriggerCmd; +import org.activiti.engine.impl.cmd.integration.DeleteIntegrationContextCmd; import org.activiti.engine.impl.persistence.entity.ExecutionEntity; import org.activiti.engine.impl.persistence.entity.integration.IntegrationContextEntityImpl; import org.activiti.engine.integration.IntegrationContextService; @@ -45,6 +38,14 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @ExtendWith(MockitoExtension.class) public class ServiceTaskIntegrationResultEventHandlerTest { @@ -83,14 +84,14 @@ public void receive_should_triggerExecutionAndDeleteRelatedIntegrationContext() handler.receive(new IntegrationResultImpl(new IntegrationRequestImpl(), integrationContext)); //then - verify(integrationContextService).deleteIntegrationContext(integrationContextEntity); final ArgumentCaptor captor = ArgumentCaptor.forClass( CompositeCommand.class); verify(managementService).executeCommand(captor.capture()); final CompositeCommand command = captor.getValue(); - assertThat(command.getCommands()).hasSize(2); - assertThat(command.getCommands().get(0)).isInstanceOf(TriggerCmd.class); - assertThat(command.getCommands().get(1)).isInstanceOf(AggregateIntegrationResultReceivedEventCmd.class); + assertThat(command.getCommands()).hasSize(3); + assertThat(command.getCommands().get(0)).isInstanceOf(DeleteIntegrationContextCmd.class); + assertThat(command.getCommands().get(1)).isInstanceOf(TriggerCmd.class); + assertThat(command.getCommands().get(2)).isInstanceOf(AggregateIntegrationResultReceivedEventCmd.class); } private ExecutionEntity buildExecutionEntity() { @@ -118,7 +119,7 @@ public void receiveShouldDoNothingWhenIntegrationContextsIsNull() { handler.receive(new IntegrationResultImpl(new IntegrationRequestImpl(), integrationContext)); //then - verify(integrationContextService, never()).deleteIntegrationContext(any()); + verify(managementService, never()).executeCommand(any()); } private IntegrationContextImpl buildIntegrationContext(Map variables) { From acd3716062453c1a976bbbb6f618619890dc43e5 Mon Sep 17 00:00:00 2001 From: Igor Dianov Date: Tue, 28 Feb 2023 14:52:38 -0800 Subject: [PATCH 06/13] update activiti.version to 0.0.1-PR-4264-1227-SNAPSHOT --- activiti-cloud-api/pom.xml | 2 +- activiti-cloud-modeling-service/pom.xml | 2 +- activiti-cloud-query-service/pom.xml | 2 +- .../channel/ServiceTaskIntegrationErrorEventHandler.java | 3 ++- .../channel/ServiceTaskIntegrationResultEventHandler.java | 3 ++- activiti-cloud-runtime-bundle-service/pom.xml | 2 +- activiti-cloud-service-common/pom.xml | 2 +- 7 files changed, 9 insertions(+), 7 deletions(-) diff --git a/activiti-cloud-api/pom.xml b/activiti-cloud-api/pom.xml index 30170900b7b..a4a1c2f3ac5 100644 --- a/activiti-cloud-api/pom.xml +++ b/activiti-cloud-api/pom.xml @@ -12,7 +12,7 @@ Activiti Cloud :: Runtime API Parent pom - 0.0.1-PR-4264-1226-SNAPSHOT + 0.0.1-PR-4264-1227-SNAPSHOT activiti-cloud-api-dependencies diff --git a/activiti-cloud-modeling-service/pom.xml b/activiti-cloud-modeling-service/pom.xml index 0ba7242e5f2..09df604c9fb 100644 --- a/activiti-cloud-modeling-service/pom.xml +++ b/activiti-cloud-modeling-service/pom.xml @@ -19,7 +19,7 @@ activiti-cloud-acceptance-tests-modeling - 0.0.1-PR-4264-1226-SNAPSHOT + 0.0.1-PR-4264-1227-SNAPSHOT 1.14.0 4.4 20230227 diff --git a/activiti-cloud-query-service/pom.xml b/activiti-cloud-query-service/pom.xml index f8f406b374a..5c9ff309494 100644 --- a/activiti-cloud-query-service/pom.xml +++ b/activiti-cloud-query-service/pom.xml @@ -18,7 +18,7 @@ activiti-cloud-starter-query - 0.0.1-PR-4264-1226-SNAPSHOT + 0.0.1-PR-4264-1227-SNAPSHOT diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java index 9d1772718c4..b86d67e2d89 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java @@ -58,7 +58,8 @@ public ServiceTaskIntegrationErrorEventHandler(RuntimeService runtimeService, this.processEngineEventsAggregator = processEngineEventsAggregator; } - @Retryable(backoff = @Backoff(delayExpression = "${activiti.cloud.integration.error.retry.backoff.delay:10}")) + @Retryable(maxAttemptsExpression = "${activiti.cloud.integration.error.retry.max-attempts:3}", + backoff = @Backoff(delayExpression = "${activiti.cloud.integration.error.retry.backoff.delay:0}")) public void receive(IntegrationError integrationError) { IntegrationContext integrationContext = integrationError.getIntegrationContext(); IntegrationContextEntity integrationContextEntity = integrationContextService.findById(integrationContext.getId()); diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java index 0a847924fff..35ff453fb73 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java @@ -61,7 +61,8 @@ public ServiceTaskIntegrationResultEventHandler(RuntimeService runtimeService, this.variablesPropagator = variablesPropagator; } - @Retryable(backoff = @Backoff(delayExpression = "${activiti.cloud.integration.result.retry.backoff.delay:10}")) + @Retryable(maxAttemptsExpression = "${activiti.cloud.integration.result.retry.max-attempts:3}", + backoff = @Backoff(delayExpression = "${activiti.cloud.integration.result.retry.backoff.delay:0}")) public void receive(IntegrationResult integrationResult) { IntegrationContext integrationContext = integrationResult.getIntegrationContext(); IntegrationContextEntity integrationContextEntity = integrationContextService.findById(integrationContext.getId()); diff --git a/activiti-cloud-runtime-bundle-service/pom.xml b/activiti-cloud-runtime-bundle-service/pom.xml index 44323192ba3..c9ba9ccedff 100644 --- a/activiti-cloud-runtime-bundle-service/pom.xml +++ b/activiti-cloud-runtime-bundle-service/pom.xml @@ -18,7 +18,7 @@ activiti-cloud-starter-runtime-bundle-it - 0.0.1-PR-4264-1226-SNAPSHOT + 0.0.1-PR-4264-1227-SNAPSHOT diff --git a/activiti-cloud-service-common/pom.xml b/activiti-cloud-service-common/pom.xml index 1646cb346da..51f8cd4fabe 100644 --- a/activiti-cloud-service-common/pom.xml +++ b/activiti-cloud-service-common/pom.xml @@ -34,7 +34,7 @@ activiti-cloud-service-messaging-starter - 0.0.1-PR-4264-1226-SNAPSHOT + 0.0.1-PR-4264-1227-SNAPSHOT 1.9.4 1.10 2.6 From a227a1d3ecd26a227c67fb3a527609daac6cedba Mon Sep 17 00:00:00 2001 From: Igor Dianov Date: Tue, 28 Feb 2023 15:38:07 -0800 Subject: [PATCH 07/13] disable trace logging level --- .../src/test/resources/application-test.properties | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/application-test.properties b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/application-test.properties index 2476accfa34..c23e9a31c61 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/application-test.properties +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/application-test.properties @@ -83,6 +83,6 @@ activiti.cloud.messaging.partition-count=1 spring.cloud.stream.bindings.queryConsumer.consumer.partitioned=${activiti.cloud.messaging.partitioned} spring.cloud.stream.bindings.auditConsumer.consumer.partitioned=${activiti.cloud.messaging.partitioned} -logging.level.org.activiti.engine.impl.db=trace -logging.level.org.activiti.engine.integration=trace -logging.level.org.springframework.retry=debug +#logging.level.org.activiti.engine.impl.db=trace +#logging.level.org.activiti.engine.integration=trace +#logging.level.org.springframework.retry=debug From 1cbf8d5f802a54a58de4d8ef4e852eb6b61afb5e Mon Sep 17 00:00:00 2001 From: Igor Dianov Date: Tue, 28 Feb 2023 15:42:31 -0800 Subject: [PATCH 08/13] remove Codaxy errors in GatewayConcurrencyIT --- .../tests/services/audit/GatewayConcurrencyIT.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java index a9c3e739a88..e66ab119d81 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/services/audit/GatewayConcurrencyIT.java @@ -16,10 +16,11 @@ package org.activiti.cloud.starter.tests.services.audit; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -34,7 +35,6 @@ import org.activiti.cloud.services.test.containers.KeycloakContainerApplicationInitializer; import org.activiti.cloud.services.test.identity.IdentityTokenProducer; import org.activiti.cloud.starter.tests.helper.ProcessInstanceRestTemplate; -import org.activiti.engine.impl.bpmn.behavior.InclusiveGatewayActivityBehavior; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -104,8 +104,6 @@ public void setUp() { streamHandler.clear(); } - InclusiveGatewayActivityBehavior s; - @AfterEach public void cleanUp(){ executorService.shutdown(); @@ -121,7 +119,7 @@ public void shouldExecuteWithoutConcurrencyException() throws IOException, Inter final IntegrationResult integrationResult = createIntegrationResult(integrationRequest); - List> tasks = new ArrayList<>(); + Set> tasks = new LinkedHashSet<>(); tasks.add(() -> { Message message = MessageBuilder.withPayload(new SignalPayload(SIGNAL_NAME, Collections.emptyMap())).build(); @@ -140,7 +138,6 @@ public void shouldExecuteWithoutConcurrencyException() throws IOException, Inter destination); return null; }); - executorService.invokeAll(tasks); await().untilAsserted(() -> { From 561da3268dc1febfe838d3f805925adb8a51d132 Mon Sep 17 00:00:00 2001 From: Igor Dianov Date: Tue, 28 Feb 2023 17:20:44 -0800 Subject: [PATCH 09/13] add ActivitiOptimisticLockingException to retryable handler --- .../channel/ServiceTaskIntegrationErrorEventHandler.java | 7 +++++-- .../channel/ServiceTaskIntegrationResultEventHandler.java | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java index b86d67e2d89..84a13d3e73e 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java @@ -23,6 +23,7 @@ import org.activiti.cloud.api.process.model.IntegrationError; import org.activiti.cloud.services.events.configuration.RuntimeBundleProperties; import org.activiti.cloud.services.events.listeners.ProcessEngineEventsAggregator; +import org.activiti.engine.ActivitiOptimisticLockingException; import org.activiti.engine.ManagementService; import org.activiti.engine.RuntimeService; import org.activiti.engine.impl.cmd.integration.DeleteIntegrationContextCmd; @@ -58,8 +59,10 @@ public ServiceTaskIntegrationErrorEventHandler(RuntimeService runtimeService, this.processEngineEventsAggregator = processEngineEventsAggregator; } - @Retryable(maxAttemptsExpression = "${activiti.cloud.integration.error.retry.max-attempts:3}", - backoff = @Backoff(delayExpression = "${activiti.cloud.integration.error.retry.backoff.delay:0}")) + @Retryable(value = ActivitiOptimisticLockingException.class, + maxAttemptsExpression = "${activiti.cloud.integration.error.retry.max-attempts:3}", + backoff = @Backoff(delayExpression = "${activiti.cloud.integration.error.retry.backoff.delay:0}") + ) public void receive(IntegrationError integrationError) { IntegrationContext integrationContext = integrationError.getIntegrationContext(); IntegrationContextEntity integrationContextEntity = integrationContextService.findById(integrationContext.getId()); diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java index 35ff453fb73..7e01b2b0bc9 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java @@ -22,6 +22,7 @@ import org.activiti.cloud.api.process.model.IntegrationResult; import org.activiti.cloud.services.events.configuration.RuntimeBundleProperties; import org.activiti.cloud.services.events.listeners.ProcessEngineEventsAggregator; +import org.activiti.engine.ActivitiOptimisticLockingException; import org.activiti.engine.ManagementService; import org.activiti.engine.RuntimeService; import org.activiti.engine.impl.bpmn.behavior.VariablesPropagator; @@ -61,8 +62,10 @@ public ServiceTaskIntegrationResultEventHandler(RuntimeService runtimeService, this.variablesPropagator = variablesPropagator; } - @Retryable(maxAttemptsExpression = "${activiti.cloud.integration.result.retry.max-attempts:3}", - backoff = @Backoff(delayExpression = "${activiti.cloud.integration.result.retry.backoff.delay:0}")) + @Retryable(value = ActivitiOptimisticLockingException.class, + maxAttemptsExpression = "${activiti.cloud.integration.result.retry.max-attempts:3}", + backoff = @Backoff(delayExpression = "${activiti.cloud.integration.result.retry.backoff.delay:0}") + ) public void receive(IntegrationResult integrationResult) { IntegrationContext integrationContext = integrationResult.getIntegrationContext(); IntegrationContextEntity integrationContextEntity = integrationContextService.findById(integrationContext.getId()); From 944e6a6a935782d4acb1e33208ced49cf037e804 Mon Sep 17 00:00:00 2001 From: Igor Dianov Date: Thu, 2 Mar 2023 08:40:23 -0800 Subject: [PATCH 10/13] polish --- .../ServiceTaskIntegrationResultEventHandler.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java index 7e01b2b0bc9..2171965fa08 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java @@ -70,16 +70,15 @@ public void receive(IntegrationResult integrationResult) { IntegrationContext integrationContext = integrationResult.getIntegrationContext(); IntegrationContextEntity integrationContextEntity = integrationContextService.findById(integrationContext.getId()); - String executionId = integrationContext.getExecutionId(); - List executions = runtimeService.createExecutionQuery() - .executionId(executionId) - .list(); - if (integrationContextEntity != null) { List> commands = new ArrayList<>(); commands.add(new DeleteIntegrationContextCmd(integrationContextEntity)); + String executionId = integrationContext.getExecutionId(); + List executions = runtimeService.createExecutionQuery() + .executionId(executionId) + .list(); if (executions.size() > 0) { Execution execution = executions.get(0); From cbc599d7ccdefeb67b80697803affa6dfd8ddd68 Mon Sep 17 00:00:00 2001 From: Igor Dianov Date: Mon, 6 Mar 2023 06:33:20 -0800 Subject: [PATCH 11/13] Add Retryable attribute for FunctionBinding annotation add FunctionBindingRequestHandlerRetryAdvice class polish names add Retryable attribute to FunctionalBinding polish Support default retryable configuration for FunctionBinding annotation --- ...rviceTaskIntegrationErrorEventHandler.java | 7 - ...viceTaskIntegrationResultEventHandler.java | 7 - .../CloudConnectorsAutoConfiguration.java | 15 +- .../config/FunctionBindingConfiguration.java | 20 +- ...ctionBindingRequestHandlerRetryAdvice.java | 28 +++ .../RetryableFunctionBindingTemplate.java | 203 ++++++++++++++++++ .../messaging/functional/FunctionBinding.java | 5 + 7 files changed, 265 insertions(+), 20 deletions(-) create mode 100644 activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/config/FunctionBindingRequestHandlerRetryAdvice.java create mode 100644 activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/config/RetryableFunctionBindingTemplate.java diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java index 84a13d3e73e..b7b3b848e82 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationErrorEventHandler.java @@ -23,7 +23,6 @@ import org.activiti.cloud.api.process.model.IntegrationError; import org.activiti.cloud.services.events.configuration.RuntimeBundleProperties; import org.activiti.cloud.services.events.listeners.ProcessEngineEventsAggregator; -import org.activiti.engine.ActivitiOptimisticLockingException; import org.activiti.engine.ManagementService; import org.activiti.engine.RuntimeService; import org.activiti.engine.impl.cmd.integration.DeleteIntegrationContextCmd; @@ -34,8 +33,6 @@ import org.activiti.engine.runtime.Execution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.retry.annotation.Backoff; -import org.springframework.retry.annotation.Retryable; public class ServiceTaskIntegrationErrorEventHandler { @@ -59,10 +56,6 @@ public ServiceTaskIntegrationErrorEventHandler(RuntimeService runtimeService, this.processEngineEventsAggregator = processEngineEventsAggregator; } - @Retryable(value = ActivitiOptimisticLockingException.class, - maxAttemptsExpression = "${activiti.cloud.integration.error.retry.max-attempts:3}", - backoff = @Backoff(delayExpression = "${activiti.cloud.integration.error.retry.backoff.delay:0}") - ) public void receive(IntegrationError integrationError) { IntegrationContext integrationContext = integrationError.getIntegrationContext(); IntegrationContextEntity integrationContextEntity = integrationContextService.findById(integrationContext.getId()); diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java index 2171965fa08..9acc6df9af2 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/ServiceTaskIntegrationResultEventHandler.java @@ -22,7 +22,6 @@ import org.activiti.cloud.api.process.model.IntegrationResult; import org.activiti.cloud.services.events.configuration.RuntimeBundleProperties; import org.activiti.cloud.services.events.listeners.ProcessEngineEventsAggregator; -import org.activiti.engine.ActivitiOptimisticLockingException; import org.activiti.engine.ManagementService; import org.activiti.engine.RuntimeService; import org.activiti.engine.impl.bpmn.behavior.VariablesPropagator; @@ -34,8 +33,6 @@ import org.activiti.engine.runtime.Execution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.retry.annotation.Backoff; -import org.springframework.retry.annotation.Retryable; public class ServiceTaskIntegrationResultEventHandler { @@ -62,10 +59,6 @@ public ServiceTaskIntegrationResultEventHandler(RuntimeService runtimeService, this.variablesPropagator = variablesPropagator; } - @Retryable(value = ActivitiOptimisticLockingException.class, - maxAttemptsExpression = "${activiti.cloud.integration.result.retry.max-attempts:3}", - backoff = @Backoff(delayExpression = "${activiti.cloud.integration.result.retry.backoff.delay:0}") - ) public void receive(IntegrationResult integrationResult) { IntegrationContext integrationContext = integrationResult.getIntegrationContext(); IntegrationContextEntity integrationContextEntity = integrationContextService.findById(integrationContext.getId()); diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java index 9c5873b2bea..88be98ba9b1 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java @@ -23,6 +23,7 @@ import org.activiti.cloud.services.events.configuration.RuntimeBundleProperties; import org.activiti.cloud.services.events.converter.RuntimeBundleInfoAppender; import org.activiti.cloud.services.events.listeners.ProcessEngineEventsAggregator; +import org.activiti.engine.ActivitiOptimisticLockingException; import org.activiti.engine.ManagementService; import org.activiti.engine.RuntimeService; import org.activiti.engine.impl.bpmn.behavior.VariablesPropagator; @@ -50,12 +51,12 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.messaging.Message; -import org.springframework.retry.annotation.EnableRetry; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; @Configuration @AutoConfigureBefore(value = ConnectorsAutoConfiguration.class) @PropertySource("classpath:config/integration-result-stream.properties") -@EnableRetry public class CloudConnectorsAutoConfiguration { private static final String LOCAL_SERVICE_TASK_BEHAVIOUR_BEAN_NAME = "localServiceTaskBehaviour"; @@ -73,7 +74,10 @@ public ServiceTaskIntegrationResultEventHandler serviceTaskIntegrationResultEven runtimeBundleProperties, managementService, processEngineEventsAggregator, variablesPropagator); } - @FunctionBinding(input = ProcessEngineIntegrationChannels.INTEGRATION_RESULTS_CONSUMER) + @FunctionBinding(input = ProcessEngineIntegrationChannels.INTEGRATION_RESULTS_CONSUMER, + retryable = @Retryable(value = ActivitiOptimisticLockingException.class, + maxAttemptsExpression = "${activiti.cloud.integration.retryable.max-attempts:3}", + backoff = @Backoff(delayExpression = "${activiti.cloud.integration.retryable.backoff.delay:0}"))) @Bean public Consumer> serviceTaskIntegrationResultEventConsumer(ServiceTaskIntegrationResultEventHandler handler) { return message -> handler.receive(message.getPayload()); @@ -93,7 +97,10 @@ public ServiceTaskIntegrationErrorEventHandler serviceTaskIntegrationErrorEventH processEngineEventsAggregator); } - @FunctionBinding(input = ProcessEngineIntegrationChannels.INTEGRATION_ERRORS_CONSUMER) + @FunctionBinding(input = ProcessEngineIntegrationChannels.INTEGRATION_ERRORS_CONSUMER, + retryable = @Retryable(value = ActivitiOptimisticLockingException.class, + maxAttemptsExpression = "${activiti.cloud.integration.retryable.max-attempts:3}", + backoff = @Backoff(delayExpression = "${activiti.cloud.integration.retryable.backoff.delay:0}"))) @Bean public Consumer> serviceTaskIntegrationErrorEventConsumer(ServiceTaskIntegrationErrorEventHandler handler) { return message -> handler.receive(message.getPayload()); diff --git a/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/config/FunctionBindingConfiguration.java b/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/config/FunctionBindingConfiguration.java index f3ad9c125cb..24f5f28963a 100644 --- a/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/config/FunctionBindingConfiguration.java +++ b/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/config/FunctionBindingConfiguration.java @@ -16,6 +16,8 @@ package org.activiti.cloud.common.messaging.config; import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; @@ -43,7 +45,9 @@ import org.springframework.integration.filter.ExpressionEvaluatingSelector; import org.springframework.integration.handler.GenericHandler; import org.springframework.integration.handler.LoggingHandler; +import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice; import org.springframework.messaging.Message; +import org.springframework.retry.support.RetryTemplate; import org.springframework.util.StringUtils; @AutoConfiguration(after = BinderFactoryAutoConfiguration.class, @@ -51,6 +55,11 @@ @ConditionalOnClass(BindingServiceProperties.class) public class FunctionBindingConfiguration extends AbstractFunctionalBindingConfiguration { + @Bean + public RetryableFunctionBindingTemplate retryableFunctionBindingTemplate() { + return new RetryableFunctionBindingTemplate(); + } + @Bean public BindingResolver bindingResolver(BindingServiceProperties bindingServiceProperties){ return destination -> Optional @@ -89,7 +98,8 @@ public Function resolveExpression(ConfigurableApplicationContext @Bean(name = "functionBindingBeanPostProcessor") public BeanPostProcessor functionBindingBeanPostProcessor(FunctionAnnotationService functionAnnotationService, IntegrationFlowContext integrationFlowContext, - Function resolveExpression) { + Function resolveExpression, + RetryableFunctionBindingTemplate retryableFunctionBindingTemplate) { return new BeanPostProcessor() { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { @@ -126,6 +136,11 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw .register(); } else { + List advices = new ArrayList<>(); + + RetryTemplate template = retryableFunctionBindingTemplate.apply(functionBinding.retryable()); + advices.add(new FunctionBindingRequestHandlerRetryAdvice(template)); + GenericHandler handler = (message, headers) -> { FunctionInvocationWrapper function = functionFromDefinition(beanName); return function.apply(message); @@ -137,7 +152,8 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw .log(LoggingHandler.Level.DEBUG, beanName + "." + functionBinding.input()) .filter(selector, filter -> filter.discardChannel("nullChannel") .throwExceptionOnRejection(true)) - .handle(Message.class, handler); + .handle(Message.class, handler, + spec -> spec.advice(advices.toArray(RequestHandlerRetryAdvice[]::new))); if (Function.class.isInstance(bean)) { functionFlowBuilder.bridge() .log(LoggingHandler.Level.DEBUG, beanName + "." + functionBinding.output()) diff --git a/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/config/FunctionBindingRequestHandlerRetryAdvice.java b/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/config/FunctionBindingRequestHandlerRetryAdvice.java new file mode 100644 index 00000000000..5b4c4277b45 --- /dev/null +++ b/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/config/FunctionBindingRequestHandlerRetryAdvice.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017-2020 Alfresco Software, Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.activiti.cloud.common.messaging.config; + +import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice; +import org.springframework.retry.support.RetryTemplate; + +public class FunctionBindingRequestHandlerRetryAdvice extends RequestHandlerRetryAdvice { + + public FunctionBindingRequestHandlerRetryAdvice(RetryTemplate retryTemplate) { + super.setRetryTemplate(retryTemplate); + } + +} diff --git a/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/config/RetryableFunctionBindingTemplate.java b/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/config/RetryableFunctionBindingTemplate.java new file mode 100644 index 00000000000..d8263fb5ca8 --- /dev/null +++ b/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/config/RetryableFunctionBindingTemplate.java @@ -0,0 +1,203 @@ +/* + * Copyright 2017-2020 Alfresco Software, Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.activiti.cloud.common.messaging.config; + +import java.lang.annotation.Annotation; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.expression.common.TemplateParserContext; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.retry.RetryListener; +import org.springframework.retry.RetryPolicy; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; +import org.springframework.retry.backoff.BackOffPolicy; +import org.springframework.retry.backoff.BackOffPolicyBuilder; +import org.springframework.retry.backoff.NoBackOffPolicy; +import org.springframework.retry.policy.ExpressionRetryPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.util.StringUtils; + +public class RetryableFunctionBindingTemplate implements Function, BeanFactoryAware { + private static final TemplateParserContext PARSER_CONTEXT = new TemplateParserContext(); + + private static final SpelExpressionParser PARSER = new SpelExpressionParser(); + + private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext(); + + private BeanFactory beanFactory; + + @Override + public RetryTemplate apply(Retryable retryable) { + String[] listenersBeanNames = retryable.listeners(); + + RetryTemplate template = new RetryTemplate(); + if (listenersBeanNames.length > 0) { + template.setListeners(getListenersBeans(listenersBeanNames)); + } +// else if (this.globalListeners != null) { +// template.setListeners(this.globalListeners); +// } + + template.setRetryPolicy(getRetryPolicy(retryable)); + template.setBackOffPolicy(getBackoffPolicy(retryable.backoff())); + + return template; + } + + private RetryListener[] getListenersBeans(String[] listenersBeanNames) { + RetryListener[] listeners = new RetryListener[listenersBeanNames.length]; + for (int i = 0; i < listeners.length; i++) { + listeners[i] = this.beanFactory.getBean(listenersBeanNames[i], RetryListener.class); + } + return listeners; + } + + private RetryPolicy getRetryPolicy(Annotation retryable) { + Map attrs = AnnotationUtils.getAnnotationAttributes(retryable); + @SuppressWarnings("unchecked") + Class[] includes = (Class[]) attrs.get("value"); + String exceptionExpression = (String) attrs.get("exceptionExpression"); + boolean hasExpression = StringUtils.hasText(exceptionExpression); + if (includes.length == 0) { + @SuppressWarnings("unchecked") + Class[] value = (Class[]) attrs.get("include"); + includes = value; + } + @SuppressWarnings("unchecked") + Class[] excludes = (Class[]) attrs.get("exclude"); + Integer maxAttempts = (Integer) attrs.get("maxAttempts"); + String maxAttemptsExpression = (String) attrs.get("maxAttemptsExpression"); + if (StringUtils.hasText(maxAttemptsExpression)) { + if (ExpressionRetryPolicy.isTemplate(maxAttemptsExpression)) { + maxAttempts = PARSER.parseExpression(resolve(maxAttemptsExpression), PARSER_CONTEXT) + .getValue(this.evaluationContext, Integer.class); + } + else { + maxAttempts = PARSER.parseExpression(resolve(maxAttemptsExpression)).getValue(this.evaluationContext, + Integer.class); + } + } + if (includes.length == 0 && excludes.length == 0) { + SimpleRetryPolicy simple = hasExpression + ? new ExpressionRetryPolicy(resolve(exceptionExpression)).withBeanFactory(this.beanFactory) + : new SimpleRetryPolicy(); + simple.setMaxAttempts(maxAttempts); + return simple; + } + Map, Boolean> policyMap = new HashMap, Boolean>(); + for (Class type : includes) { + policyMap.put(type, true); + } + for (Class type : excludes) { + policyMap.put(type, false); + } + boolean retryNotExcluded = includes.length == 0; + if (hasExpression) { + return new ExpressionRetryPolicy(maxAttempts, policyMap, true, exceptionExpression, retryNotExcluded) + .withBeanFactory(this.beanFactory); + } + else { + return new SimpleRetryPolicy(maxAttempts, policyMap, true, retryNotExcluded); + } + } + + private BackOffPolicy getBackoffPolicy(Backoff backoff) { + Map attrs = AnnotationUtils.getAnnotationAttributes(backoff); + long min = backoff.delay() == 0 ? backoff.value() : backoff.delay(); + String delayExpression = (String) attrs.get("delayExpression"); + if (StringUtils.hasText(delayExpression)) { + if (ExpressionRetryPolicy.isTemplate(delayExpression)) { + min = PARSER.parseExpression(resolve(delayExpression), PARSER_CONTEXT).getValue(this.evaluationContext, + Long.class); + } + else { + min = PARSER.parseExpression(resolve(delayExpression)).getValue(this.evaluationContext, Long.class); + } + } + long max = backoff.maxDelay(); + String maxDelayExpression = (String) attrs.get("maxDelayExpression"); + if (StringUtils.hasText(maxDelayExpression)) { + if (ExpressionRetryPolicy.isTemplate(maxDelayExpression)) { + max = PARSER.parseExpression(resolve(maxDelayExpression), PARSER_CONTEXT) + .getValue(this.evaluationContext, Long.class); + } + else { + max = PARSER.parseExpression(resolve(maxDelayExpression)).getValue(this.evaluationContext, Long.class); + } + } + double multiplier = backoff.multiplier(); + String multiplierExpression = (String) attrs.get("multiplierExpression"); + if (StringUtils.hasText(multiplierExpression)) { + if (ExpressionRetryPolicy.isTemplate(multiplierExpression)) { + multiplier = PARSER.parseExpression(resolve(multiplierExpression), PARSER_CONTEXT) + .getValue(this.evaluationContext, Double.class); + } + else { + multiplier = PARSER.parseExpression(resolve(multiplierExpression)).getValue(this.evaluationContext, + Double.class); + } + } + boolean isRandom = false; + if (multiplier > 0) { + isRandom = backoff.random(); + String randomExpression = (String) attrs.get("randomExpression"); + if (StringUtils.hasText(randomExpression)) { + if (ExpressionRetryPolicy.isTemplate(randomExpression)) { + isRandom = PARSER.parseExpression(resolve(randomExpression), PARSER_CONTEXT) + .getValue(this.evaluationContext, Boolean.class); + } + else { + isRandom = PARSER.parseExpression(resolve(randomExpression)).getValue(this.evaluationContext, + Boolean.class); + } + } + } + return min == 0 ? new NoBackOffPolicy() : BackOffPolicyBuilder.newBuilder() + .delay(min) + .maxDelay(max) + .multiplier(multiplier) + .random(isRandom) + .build(); + } + + /** + * Resolve the specified value if possible. + * + * @see ConfigurableBeanFactory#resolveEmbeddedValue + */ + private String resolve(String value) { + if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) { + return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value); + } + return value; + } + + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; + } +} diff --git a/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/functional/FunctionBinding.java b/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/functional/FunctionBinding.java index b5b93c39021..42dd41fe70b 100644 --- a/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/functional/FunctionBinding.java +++ b/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/functional/FunctionBinding.java @@ -20,6 +20,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; @Retention( RetentionPolicy.RUNTIME ) @Target( {ElementType.METHOD, ElementType.TYPE} ) @@ -30,4 +32,7 @@ String input() default ""; String condition() default ""; + + Retryable retryable() default @Retryable(maxAttemptsExpression = "${activiti.cloud.messaging.retryable.max-attempts:1}", + backoff = @Backoff(delayExpression = "${activiti.cloud.messaging.retryable.backoff.delay:0}")); } From 3db24e2d33b9bbaa72515b97dc823d8901d0c788 Mon Sep 17 00:00:00 2001 From: Igor Dianov Date: Mon, 6 Mar 2023 09:09:56 -0800 Subject: [PATCH 12/13] Add RuntimeIntegrationEventFunctionBinding with common retryable --- ...untimeIntegrationEventFunctionBinding.java | 37 +++++++++++++++++++ .../CloudConnectorsAutoConfiguration.java | 15 ++------ .../messaging/functional/FunctionBinding.java | 4 +- 3 files changed, 42 insertions(+), 14 deletions(-) create mode 100644 activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/RuntimeIntegrationEventFunctionBinding.java diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/RuntimeIntegrationEventFunctionBinding.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/RuntimeIntegrationEventFunctionBinding.java new file mode 100644 index 00000000000..49abd7e128a --- /dev/null +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/RuntimeIntegrationEventFunctionBinding.java @@ -0,0 +1,37 @@ +/* + * Copyright 2017-2020 Alfresco Software, Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.activiti.services.connectors.channel; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.activiti.cloud.common.messaging.functional.FunctionBinding; +import org.activiti.engine.ActivitiOptimisticLockingException; +import org.springframework.core.annotation.AliasFor; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; + +@Retention( RetentionPolicy.RUNTIME ) +@Target( {ElementType.METHOD, ElementType.TYPE} ) +@FunctionBinding(retryable = @Retryable(value = ActivitiOptimisticLockingException.class, + maxAttemptsExpression = "${activiti.cloud.runtime.integration.retryable.max-attempts:3}", + backoff = @Backoff(delayExpression = "${activiti.cloud.runtime.integration.retryable.backoff.delay:0}"))) +public @interface RuntimeIntegrationEventFunctionBinding { + @AliasFor(annotation = FunctionBinding.class, attribute = "input") + String value(); +} diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java index 88be98ba9b1..4285bfd6e42 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java @@ -19,11 +19,9 @@ import java.util.function.Consumer; import org.activiti.cloud.api.process.model.IntegrationError; import org.activiti.cloud.api.process.model.IntegrationResult; -import org.activiti.cloud.common.messaging.functional.FunctionBinding; import org.activiti.cloud.services.events.configuration.RuntimeBundleProperties; import org.activiti.cloud.services.events.converter.RuntimeBundleInfoAppender; import org.activiti.cloud.services.events.listeners.ProcessEngineEventsAggregator; -import org.activiti.engine.ActivitiOptimisticLockingException; import org.activiti.engine.ManagementService; import org.activiti.engine.RuntimeService; import org.activiti.engine.impl.bpmn.behavior.VariablesPropagator; @@ -38,6 +36,7 @@ import org.activiti.services.connectors.channel.IntegrationRequestBuilder; import org.activiti.services.connectors.channel.IntegrationRequestReplayer; import org.activiti.services.connectors.channel.ProcessEngineIntegrationChannels; +import org.activiti.services.connectors.channel.RuntimeIntegrationEventFunctionBinding; import org.activiti.services.connectors.channel.ServiceTaskIntegrationErrorEventHandler; import org.activiti.services.connectors.channel.ServiceTaskIntegrationResultEventHandler; import org.activiti.services.connectors.message.IntegrationContextMessageBuilderFactory; @@ -51,8 +50,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.messaging.Message; -import org.springframework.retry.annotation.Backoff; -import org.springframework.retry.annotation.Retryable; @Configuration @AutoConfigureBefore(value = ConnectorsAutoConfiguration.class) @@ -74,11 +71,8 @@ public ServiceTaskIntegrationResultEventHandler serviceTaskIntegrationResultEven runtimeBundleProperties, managementService, processEngineEventsAggregator, variablesPropagator); } - @FunctionBinding(input = ProcessEngineIntegrationChannels.INTEGRATION_RESULTS_CONSUMER, - retryable = @Retryable(value = ActivitiOptimisticLockingException.class, - maxAttemptsExpression = "${activiti.cloud.integration.retryable.max-attempts:3}", - backoff = @Backoff(delayExpression = "${activiti.cloud.integration.retryable.backoff.delay:0}"))) @Bean + @RuntimeIntegrationEventFunctionBinding(ProcessEngineIntegrationChannels.INTEGRATION_RESULTS_CONSUMER) public Consumer> serviceTaskIntegrationResultEventConsumer(ServiceTaskIntegrationResultEventHandler handler) { return message -> handler.receive(message.getPayload()); } @@ -97,11 +91,8 @@ public ServiceTaskIntegrationErrorEventHandler serviceTaskIntegrationErrorEventH processEngineEventsAggregator); } - @FunctionBinding(input = ProcessEngineIntegrationChannels.INTEGRATION_ERRORS_CONSUMER, - retryable = @Retryable(value = ActivitiOptimisticLockingException.class, - maxAttemptsExpression = "${activiti.cloud.integration.retryable.max-attempts:3}", - backoff = @Backoff(delayExpression = "${activiti.cloud.integration.retryable.backoff.delay:0}"))) @Bean + @RuntimeIntegrationEventFunctionBinding(ProcessEngineIntegrationChannels.INTEGRATION_ERRORS_CONSUMER) public Consumer> serviceTaskIntegrationErrorEventConsumer(ServiceTaskIntegrationErrorEventHandler handler) { return message -> handler.receive(message.getPayload()); } diff --git a/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/functional/FunctionBinding.java b/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/functional/FunctionBinding.java index 42dd41fe70b..ecf70794152 100644 --- a/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/functional/FunctionBinding.java +++ b/activiti-cloud-service-common/activiti-cloud-service-messaging-config/src/main/java/org/activiti/cloud/common/messaging/functional/FunctionBinding.java @@ -33,6 +33,6 @@ String condition() default ""; - Retryable retryable() default @Retryable(maxAttemptsExpression = "${activiti.cloud.messaging.retryable.max-attempts:1}", - backoff = @Backoff(delayExpression = "${activiti.cloud.messaging.retryable.backoff.delay:0}")); + Retryable retryable() default @Retryable(maxAttemptsExpression = "${activiti.cloud.messaging.function.retryable.max-attempts:1}", + backoff = @Backoff(delayExpression = "${activiti.cloud.messaging.function.retryable.backoff.delay:0}")); } From 27f1ab8bb36c8020c484836e64afc38f3113a038 Mon Sep 17 00:00:00 2001 From: Igor Dianov Date: Mon, 6 Mar 2023 09:23:46 -0800 Subject: [PATCH 13/13] update name to RuntimeIntegrationEventInputBinding --- ...inding.java => RuntimeIntegrationEventInputBinding.java} | 2 +- .../connectors/conf/CloudConnectorsAutoConfiguration.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) rename activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/{RuntimeIntegrationEventFunctionBinding.java => RuntimeIntegrationEventInputBinding.java} (96%) diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/RuntimeIntegrationEventFunctionBinding.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/RuntimeIntegrationEventInputBinding.java similarity index 96% rename from activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/RuntimeIntegrationEventFunctionBinding.java rename to activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/RuntimeIntegrationEventInputBinding.java index 49abd7e128a..359bdd9627d 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/RuntimeIntegrationEventFunctionBinding.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/channel/RuntimeIntegrationEventInputBinding.java @@ -31,7 +31,7 @@ @FunctionBinding(retryable = @Retryable(value = ActivitiOptimisticLockingException.class, maxAttemptsExpression = "${activiti.cloud.runtime.integration.retryable.max-attempts:3}", backoff = @Backoff(delayExpression = "${activiti.cloud.runtime.integration.retryable.backoff.delay:0}"))) -public @interface RuntimeIntegrationEventFunctionBinding { +public @interface RuntimeIntegrationEventInputBinding { @AliasFor(annotation = FunctionBinding.class, attribute = "input") String value(); } diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java index 4285bfd6e42..dc6d3b002ba 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-services-runtime-bundle/activiti-cloud-services-connectors/src/main/java/org/activiti/services/connectors/conf/CloudConnectorsAutoConfiguration.java @@ -36,7 +36,7 @@ import org.activiti.services.connectors.channel.IntegrationRequestBuilder; import org.activiti.services.connectors.channel.IntegrationRequestReplayer; import org.activiti.services.connectors.channel.ProcessEngineIntegrationChannels; -import org.activiti.services.connectors.channel.RuntimeIntegrationEventFunctionBinding; +import org.activiti.services.connectors.channel.RuntimeIntegrationEventInputBinding; import org.activiti.services.connectors.channel.ServiceTaskIntegrationErrorEventHandler; import org.activiti.services.connectors.channel.ServiceTaskIntegrationResultEventHandler; import org.activiti.services.connectors.message.IntegrationContextMessageBuilderFactory; @@ -72,7 +72,7 @@ public ServiceTaskIntegrationResultEventHandler serviceTaskIntegrationResultEven } @Bean - @RuntimeIntegrationEventFunctionBinding(ProcessEngineIntegrationChannels.INTEGRATION_RESULTS_CONSUMER) + @RuntimeIntegrationEventInputBinding(ProcessEngineIntegrationChannels.INTEGRATION_RESULTS_CONSUMER) public Consumer> serviceTaskIntegrationResultEventConsumer(ServiceTaskIntegrationResultEventHandler handler) { return message -> handler.receive(message.getPayload()); } @@ -92,7 +92,7 @@ public ServiceTaskIntegrationErrorEventHandler serviceTaskIntegrationErrorEventH } @Bean - @RuntimeIntegrationEventFunctionBinding(ProcessEngineIntegrationChannels.INTEGRATION_ERRORS_CONSUMER) + @RuntimeIntegrationEventInputBinding(ProcessEngineIntegrationChannels.INTEGRATION_ERRORS_CONSUMER) public Consumer> serviceTaskIntegrationErrorEventConsumer(ServiceTaskIntegrationErrorEventHandler handler) { return message -> handler.receive(message.getPayload()); }