-
Notifications
You must be signed in to change notification settings - Fork 60
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Replayer scenario when frequency is changed
- Loading branch information
1 parent
680fdce
commit 6317ecb
Showing
3 changed files
with
182 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
24 changes: 24 additions & 0 deletions
24
src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodicReplayTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package com.uber.cadence.samples.replaytests; | ||
|
||
import com.uber.cadence.samples.hello.HelloPeriodic; | ||
import com.uber.cadence.testing.WorkflowReplayer; | ||
import org.junit.Test; | ||
|
||
public class HelloPeriodicReplayTest { | ||
|
||
// continue-as-new case for replayer tests: Passing | ||
@Test | ||
public void testReplay_continueAsNew() throws Exception { | ||
WorkflowReplayer.replayWorkflowExecutionFromResource( | ||
"replaytests/HelloPeriodic.json", HelloPeriodic.GreetingWorkflowImpl.class); | ||
} | ||
|
||
// Continue as new case: change in sleep timer compared to original workflow definition. It should | ||
// fail. BUT it is currently passing. | ||
@Test | ||
public void testReplay_continueAsNew_timerChange() throws Exception { | ||
WorkflowReplayer.replayWorkflowExecutionFromResource( | ||
"replaytests/HelloPeriodic.json", | ||
HelloPeriodic_sleepTimerChange.GreetingWorkflowImpl.class); | ||
} | ||
} |
158 changes: 158 additions & 0 deletions
158
src/test/java/com/uber/cadence/samples/replaytests/HelloPeriodic_sleepTimerChange.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
/* | ||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Modifications copyright (C) 2017 Uber Technologies, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). You may not | ||
* use this file except in compliance with the License. A copy of the License is | ||
* located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package com.uber.cadence.samples.replaytests; | ||
|
||
import static com.uber.cadence.samples.common.SampleConstants.DOMAIN; | ||
|
||
import com.google.common.base.Throwables; | ||
import com.uber.cadence.WorkflowExecution; | ||
import com.uber.cadence.WorkflowIdReusePolicy; | ||
import com.uber.cadence.activity.Activity; | ||
import com.uber.cadence.activity.ActivityOptions; | ||
import com.uber.cadence.client.DuplicateWorkflowException; | ||
import com.uber.cadence.client.WorkflowClient; | ||
import com.uber.cadence.client.WorkflowClientOptions; | ||
import com.uber.cadence.client.WorkflowException; | ||
import com.uber.cadence.client.WorkflowStub; | ||
import com.uber.cadence.internal.compatibility.Thrift2ProtoAdapter; | ||
import com.uber.cadence.internal.compatibility.proto.serviceclient.IGrpcServiceStubs; | ||
import com.uber.cadence.worker.Worker; | ||
import com.uber.cadence.worker.WorkerFactory; | ||
import com.uber.cadence.workflow.Workflow; | ||
import com.uber.cadence.workflow.WorkflowMethod; | ||
import java.time.Duration; | ||
import java.util.Optional; | ||
|
||
public class HelloPeriodic_sleepTimerChange { | ||
|
||
static final String TASK_LIST = "HelloPeriodic"; | ||
static final String PERIODIC_WORKFLOW_ID = "HelloPeriodic"; | ||
|
||
public interface GreetingWorkflow { | ||
@WorkflowMethod( | ||
// At most one instance. | ||
workflowId = PERIODIC_WORKFLOW_ID, | ||
// To allow starting workflow with the same ID after the previous one has terminated. | ||
workflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicate, | ||
// Adjust this value to the maximum time workflow is expected to run. | ||
// It usually depends on the number of repetitions and interval between them. | ||
executionStartToCloseTimeoutSeconds = 300, | ||
taskList = TASK_LIST | ||
) | ||
void greetPeriodically(String name, Duration delay); | ||
} | ||
|
||
public interface GreetingActivities { | ||
void greet(String greeting); | ||
} | ||
|
||
public static class GreetingWorkflowImpl implements GreetingWorkflow { | ||
|
||
/** | ||
* This value is so low just to make the example interesting to watch. In real life you would | ||
* use something like 100 or a value that matches a business cycle. For example if it runs once | ||
* an hour 24 would make sense. | ||
*/ | ||
private final int CONTINUE_AS_NEW_FREQUENCEY = 1000; | ||
|
||
private final GreetingActivities activities = | ||
Workflow.newActivityStub( | ||
GreetingActivities.class, | ||
new ActivityOptions.Builder() | ||
.setScheduleToCloseTimeout(Duration.ofSeconds(10)) | ||
.build()); | ||
|
||
/** | ||
* Stub used to terminate this workflow run and create the next one with the same ID atomically. | ||
*/ | ||
private final GreetingWorkflow continueAsNew = | ||
Workflow.newContinueAsNewStub(GreetingWorkflow.class); | ||
|
||
@Override | ||
public void greetPeriodically(String name, Duration delay) { | ||
// Loop the predefined number of times then continue this workflow as new. | ||
// This is needed to periodically truncate the history size. | ||
for (int i = 0; i < CONTINUE_AS_NEW_FREQUENCEY; i++) { | ||
activities.greet("Hello " + name + "!"); | ||
Workflow.sleep(delay); | ||
} | ||
// Current workflow run stops executing after this call. | ||
continueAsNew.greetPeriodically(name, delay); | ||
// unreachable line | ||
} | ||
} | ||
|
||
static class GreetingActivitiesImpl implements GreetingActivities { | ||
@Override | ||
public void greet(String greeting) { | ||
System.out.println("From " + Activity.getWorkflowExecution() + ": " + greeting); | ||
} | ||
} | ||
|
||
public static void main(String[] args) throws InterruptedException { | ||
// Get a new client | ||
// NOTE: to set a different options, you can do like this: | ||
// ClientOptions.newBuilder().setRpcTimeout(5 * 1000).build(); | ||
WorkflowClient workflowClient = | ||
WorkflowClient.newInstance( | ||
new Thrift2ProtoAdapter(IGrpcServiceStubs.newInstance()), | ||
WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build()); | ||
// Get worker to poll the task list. | ||
WorkerFactory factory = WorkerFactory.newInstance(workflowClient); | ||
Worker worker = factory.newWorker(TASK_LIST); | ||
// Workflows are stateful. So you need a type to create instances. | ||
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class); | ||
// Activities are stateless and thread safe. So a shared instance is used. | ||
worker.registerActivitiesImplementations(new GreetingActivitiesImpl()); | ||
// Start listening to the workflow and activity task lists. | ||
factory.start(); | ||
|
||
// Start a workflow execution. Usually this is done from another program. | ||
// To ensure that this daemon type workflow is always running try to start it periodically | ||
// ignoring the duplicated exception. | ||
// It is only to protect from application level failures. | ||
// Failures of a workflow worker don't lead to workflow failures. | ||
WorkflowExecution execution = null; | ||
while (true) { | ||
// Print reason of failure of the previous run, before restarting. | ||
if (execution != null) { | ||
WorkflowStub workflow = workflowClient.newUntypedWorkflowStub(execution, Optional.empty()); | ||
try { | ||
workflow.getResult(Void.class); // | ||
} catch (WorkflowException e) { | ||
System.out.println("Previous instance failed:\n" + Throwables.getStackTraceAsString(e)); | ||
} | ||
} | ||
// New stub instance should be created for each new workflow start. | ||
GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class); | ||
try { | ||
execution = | ||
WorkflowClient.start(workflow::greetPeriodically, "World", Duration.ofSeconds(3)); | ||
System.out.println("Started " + execution); | ||
} catch (DuplicateWorkflowException e) { | ||
System.out.println("Still running as " + e.getExecution()); | ||
} catch (Throwable e) { | ||
e.printStackTrace(); | ||
System.exit(1); | ||
} | ||
// This value is so low just for the sample purpose. In production workflow | ||
// it is usually much higher. | ||
Thread.sleep(10000); | ||
} | ||
} | ||
} |