Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyang377 authored Oct 19, 2024
2 parents 799cb21 + 462cd18 commit d7b02a9
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ public static PipelineJobType parseJobType(final String jobId) {
return JobCodeRegistry.getJobType(jobId.substring(1, 3));
}

private static void verifyJobId(final String jobId) {
Preconditions.checkArgument(jobId.length() > 10, "Invalid job id length, job id: `%s`", jobId);
Preconditions.checkArgument('j' == jobId.charAt(0), "Invalid job id, first char: `%s`", jobId.charAt(0));
}

/**
* Parse context key.
*
Expand All @@ -92,6 +87,11 @@ public static PipelineContextKey parseContextKey(final String jobId) {
return new PipelineContextKey(databaseName, InstanceType.valueOf(instanceTypeCode));
}

private static void verifyJobId(final String jobId) {
Preconditions.checkArgument(jobId.length() > 10, "Invalid job id length, job id: `%s`", jobId);
Preconditions.checkArgument('j' == jobId.charAt(0), "Invalid job id, first char: `%s`", jobId.charAt(0));
}

/**
* Get ElasticJob configuration POJO.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.job.progress;

import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class JobItemIncrementalTasksProgressTest {

@Test
void assertGetIncrementalPositionWithNullIncrementalTaskProgress() {
assertFalse(new JobItemIncrementalTasksProgress(null).getIncrementalPosition().isPresent());
}

@Test
void assertGetIncrementalPositionWithNotNullIncrementalTaskProgress() {
assertTrue(new JobItemIncrementalTasksProgress(mock(IncrementalTaskProgress.class, RETURNS_DEEP_STUBS)).getIncrementalPosition().isPresent());
}

@Test
void assertGetIncrementalLatestActiveTimeMillisWithNullIncrementalTaskProgress() {
assertThat(new JobItemIncrementalTasksProgress(null).getIncrementalLatestActiveTimeMillis(), is(0L));
}

@Test
void assertGetIncrementalLatestActiveTimeMillisWithNotNullIncrementalTaskProgress() {
IncrementalTaskProgress incrementalTaskProgress = mock(IncrementalTaskProgress.class, RETURNS_DEEP_STUBS);
when(incrementalTaskProgress.getIncrementalTaskDelay().getLatestActiveTimeMillis()).thenReturn(10L);
assertThat(new JobItemIncrementalTasksProgress(incrementalTaskProgress).getIncrementalLatestActiveTimeMillis(), is(10L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,34 @@
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

class PipelineJobIdUtilsTest {

@Test
void assertParse() {
for (InstanceType each : InstanceType.values()) {
assertParse0(each);
}
void assertParseJobTypeWithInvalidJobId() {
assertThrows(IllegalArgumentException.class, () -> PipelineJobIdUtils.parseJobType("123"));
assertThrows(IllegalArgumentException.class, () -> PipelineJobIdUtils.parseJobType("12345678901234567890"));
}

private void assertParse0(final InstanceType instanceType) {
PipelineContextKey contextKey = new PipelineContextKey("sharding_db", instanceType);
String jobId = PipelineJobIdUtils.marshal(new MigrationJobId(contextKey, Collections.singletonList("t_order:ds_0.t_order_0,ds_0.t_order_1")));
@Test
void assertParseJobType() {
PipelineContextKey contextKey = new PipelineContextKey("foo_db", InstanceType.JDBC);
String jobId = PipelineJobIdUtils.marshal(new MigrationJobId(contextKey, Collections.singletonList("foo_tbl:foo_ds.foo_tbl_0,foo_ds.foo_tbl_1")));
assertThat(PipelineJobIdUtils.parseJobType(jobId), instanceOf(MigrationJobType.class));
PipelineContextKey actualContextKey = PipelineJobIdUtils.parseContextKey(jobId);
assertThat(actualContextKey.getInstanceType(), is(instanceType));
assertThat(actualContextKey.getDatabaseName(), is(instanceType == InstanceType.PROXY ? "" : "sharding_db"));
}

@Test
void assertParseContextKeyWithJDBCInstanceType() {
PipelineContextKey contextKey = new PipelineContextKey("foo_db", InstanceType.JDBC);
String jobId = PipelineJobIdUtils.marshal(new MigrationJobId(contextKey, Collections.singletonList("foo_tbl:foo_ds.foo_tbl_0,foo_ds.foo_tbl_1")));
assertThat(PipelineJobIdUtils.parseContextKey(jobId).getDatabaseName(), is("foo_db"));
}

@Test
void assertParseContextKeyWithProxyInstanceType() {
PipelineContextKey contextKey = new PipelineContextKey("foo_db", InstanceType.PROXY);
String jobId = PipelineJobIdUtils.marshal(new MigrationJobId(contextKey, Collections.singletonList("foo_tbl:foo_ds.foo_tbl_0,foo_ds.foo_tbl_1")));
assertThat(PipelineJobIdUtils.parseContextKey(jobId).getDatabaseName(), is(""));
}
}

0 comments on commit d7b02a9

Please sign in to comment.