diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java index 3d7c9b4fbcb0c..487555edcb51a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java @@ -26,9 +26,11 @@ import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; +import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; import java.sql.SQLException; import java.util.Collection; @@ -41,7 +43,30 @@ */ public interface InventoryIncrementalJobAPI extends PipelineJobAPI { - @Override + /** + * Get pipeline job info. + * + * @param jobId job ID + * @return pipeline job info + */ + PipelineJobInfo getJobInfo(String jobId); + + /** + * Build task configuration. + * + * @param pipelineJobConfig pipeline job configuration + * @param jobShardingItem job sharding item + * @param pipelineProcessConfig pipeline process configuration + * @return task configuration + */ + PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig); + + /** + * Build pipeline process context. + * + * @param pipelineJobConfig pipeline job configuration + * @return pipeline process context + */ InventoryIncrementalProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig); /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index 40fb5d8a3f0b4..b823bba76bf0b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -18,14 +18,10 @@ package org.apache.shardingsphere.data.pipeline.core.job.service; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; -import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI; @@ -38,24 +34,6 @@ @SingletonSPI public interface PipelineJobAPI extends TypedSPI { - /** - * Build task configuration. - * - * @param pipelineJobConfig pipeline job configuration - * @param jobShardingItem job sharding item - * @param pipelineProcessConfig pipeline process configuration - * @return task configuration - */ - PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig); - - /** - * Build pipeline process context. - * - * @param pipelineJobConfig pipeline job configuration - * @return pipeline process context - */ - PipelineProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig); - /** * Get job configuration. * @@ -99,14 +77,6 @@ default Optional getToBeStoppedPreviousJobType() { return Optional.empty(); } - /** - * Get pipeline job info. - * - * @param jobId job ID - * @return pipeline job info - */ - PipelineJobInfo getJobInfo(String jobId); - /** * Persist job item progress. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index 47d8eecad2e43..47860f1400c53 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -39,6 +39,7 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -178,7 +179,10 @@ public void drop(final String jobId) { * @return jobs info */ public List getPipelineJobInfos(final PipelineContextKey contextKey) { - return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each -> pipelineJobAPI.getJobInfo(each.getJobName())).collect(Collectors.toList()); + if (pipelineJobAPI instanceof InventoryIncrementalJobAPI) { + return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each -> ((InventoryIncrementalJobAPI) pipelineJobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList()); + } + return Collections.emptyList(); } private Stream getJobBriefInfos(final PipelineContextKey contextKey, final String jobType) { diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index e2662e1340dfc..ac99a1917c8b8 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -19,17 +19,13 @@ import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper; import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; @@ -42,7 +38,6 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter; @@ -122,19 +117,9 @@ private void verifyPipelineDatabaseType(final CreateConsistencyCheckJobParameter ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getTargetDatabaseType()), () -> new UnsupportedPipelineDatabaseTypeException(param.getTargetDatabaseType())); } - /** - * Get latest data consistency check result. - * - * @param parentJobId parent job id - * @return latest data consistency check result - */ - public Map getLatestDataConsistencyCheckResult(final String parentJobId) { - GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)); - Optional latestCheckJobId = governanceRepositoryAPI.getLatestCheckJobId(parentJobId); - if (!latestCheckJobId.isPresent()) { - return Collections.emptyMap(); - } - return governanceRepositoryAPI.getCheckJobResult(parentJobId, latestCheckJobId.get()); + @Override + public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() { + return true; } @Override @@ -354,21 +339,6 @@ public ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurati return new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); } - @Override - public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { - throw new UnsupportedOperationException(); - } - - @Override - public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) { - throw new UnsupportedOperationException(); - } - - @Override - public PipelineJobInfo getJobInfo(final String jobId) { - throw new UnsupportedOperationException(); - } - @Override public Class getPipelineJobClass() { return ConsistencyCheckJob.class; diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java index cb6f6f6a35d8f..6e90b0b222498 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java @@ -73,21 +73,6 @@ void assertCreateJobConfig() { assertThat(sequence, is(expectedSequence)); } - @Test - void assertGetLatestDataConsistencyCheckResult() { - MigrationJobConfiguration parentJobConfig = jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration()); - String parentJobId = parentJobConfig.getJobId(); - String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, - parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); - GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()); - governanceRepositoryAPI.persistLatestCheckJobId(parentJobId, checkJobId); - Map expectedCheckResult = Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true)); - governanceRepositoryAPI.persistCheckJobResult(parentJobId, checkJobId, expectedCheckResult); - Map actualCheckResult = checkJobAPI.getLatestDataConsistencyCheckResult(parentJobId); - assertThat(actualCheckResult.size(), is(expectedCheckResult.size())); - assertThat(actualCheckResult.get("t_order").isMatched(), is(expectedCheckResult.get("t_order").isMatched())); - } - @Test void assertDropByParentJobId() { MigrationJobConfiguration parentJobConfig = jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());