Skip to content

Commit

Permalink
Move PipelineJobAPI.getJobInfo() to InventoryIncrementalJobAPI (#29058)
Browse files Browse the repository at this point in the history
* Move PipelineJobAPI.getJobInfo() to InventoryIncrementalJobAPI

* Move PipelineJobAPI.getJobInfo() to InventoryIncrementalJobAPI

* Remove useless codes
  • Loading branch information
terrymanu authored Nov 16, 2023
1 parent 68a5d5f commit 1198efc
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;

import java.sql.SQLException;
import java.util.Collection;
Expand All @@ -41,7 +43,30 @@
*/
public interface InventoryIncrementalJobAPI extends PipelineJobAPI {

@Override
/**
* Get pipeline job info.
*
* @param jobId job ID
* @return pipeline job info
*/
PipelineJobInfo getJobInfo(String jobId);

/**
* Build task configuration.
*
* @param pipelineJobConfig pipeline job configuration
* @param jobShardingItem job sharding item
* @param pipelineProcessConfig pipeline process configuration
* @return task configuration
*/
PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);

/**
* Build pipeline process context.
*
* @param pipelineJobConfig pipeline job configuration
* @return pipeline process context
*/
InventoryIncrementalProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
Expand All @@ -38,24 +34,6 @@
@SingletonSPI
public interface PipelineJobAPI extends TypedSPI {

/**
* Build task configuration.
*
* @param pipelineJobConfig pipeline job configuration
* @param jobShardingItem job sharding item
* @param pipelineProcessConfig pipeline process configuration
* @return task configuration
*/
PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);

/**
* Build pipeline process context.
*
* @param pipelineJobConfig pipeline job configuration
* @return pipeline process context
*/
PipelineProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);

/**
* Get job configuration.
*
Expand Down Expand Up @@ -99,14 +77,6 @@ default Optional<String> getToBeStoppedPreviousJobType() {
return Optional.empty();
}

/**
* Get pipeline job info.
*
* @param jobId job ID
* @return pipeline job info
*/
PipelineJobInfo getJobInfo(String jobId);

/**
* Persist job item progress.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -178,7 +179,10 @@ public void drop(final String jobId) {
* @return jobs info
*/
public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey contextKey) {
return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each -> pipelineJobAPI.getJobInfo(each.getJobName())).collect(Collectors.toList());
if (pipelineJobAPI instanceof InventoryIncrementalJobAPI) {
return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each -> ((InventoryIncrementalJobAPI) pipelineJobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
}
return Collections.emptyList();
}

private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey contextKey, final String jobType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@

import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
Expand All @@ -42,7 +38,6 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
Expand Down Expand Up @@ -122,19 +117,9 @@ private void verifyPipelineDatabaseType(final CreateConsistencyCheckJobParameter
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getTargetDatabaseType()), () -> new UnsupportedPipelineDatabaseTypeException(param.getTargetDatabaseType()));
}

/**
* Get latest data consistency check result.
*
* @param parentJobId parent job id
* @return latest data consistency check result
*/
public Map<String, TableDataConsistencyCheckResult> getLatestDataConsistencyCheckResult(final String parentJobId) {
GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
Optional<String> latestCheckJobId = governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
if (!latestCheckJobId.isPresent()) {
return Collections.emptyMap();
}
return governanceRepositoryAPI.getCheckJobResult(parentJobId, latestCheckJobId.get());
@Override
public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
return true;
}

@Override
Expand Down Expand Up @@ -354,21 +339,6 @@ public ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurati
return new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}

@Override
public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
throw new UnsupportedOperationException();
}

@Override
public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
throw new UnsupportedOperationException();
}

@Override
public PipelineJobInfo getJobInfo(final String jobId) {
throw new UnsupportedOperationException();
}

@Override
public Class<ConsistencyCheckJob> getPipelineJobClass() {
return ConsistencyCheckJob.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,6 @@ void assertCreateJobConfig() {
assertThat(sequence, is(expectedSequence));
}

@Test
void assertGetLatestDataConsistencyCheckResult() {
MigrationJobConfiguration parentJobConfig = jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
String parentJobId = parentJobConfig.getJobId();
String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType()));
GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
governanceRepositoryAPI.persistLatestCheckJobId(parentJobId, checkJobId);
Map<String, TableDataConsistencyCheckResult> expectedCheckResult = Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true));
governanceRepositoryAPI.persistCheckJobResult(parentJobId, checkJobId, expectedCheckResult);
Map<String, TableDataConsistencyCheckResult> actualCheckResult = checkJobAPI.getLatestDataConsistencyCheckResult(parentJobId);
assertThat(actualCheckResult.size(), is(expectedCheckResult.size()));
assertThat(actualCheckResult.get("t_order").isMatched(), is(expectedCheckResult.get("t_order").isMatched()));
}

@Test
void assertDropByParentJobId() {
MigrationJobConfiguration parentJobConfig = jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
Expand Down

0 comments on commit 1198efc

Please sign in to comment.