Skip to content

Commit

Permalink
Refactor AbstractSeparablePipelineJob (#29363)
Browse files Browse the repository at this point in the history
* Refactor AbstractSeparablePipelineJob

* Refactor AbstractSeparablePipelineJob

* Refactor AbstractSeparablePipelineJob

* Refactor AbstractSeparablePipelineJob
  • Loading branch information
terrymanu authored Dec 11, 2023
1 parent ab93554 commit 5142e6e
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;

Expand All @@ -44,7 +50,7 @@ public abstract class AbstractSeparablePipelineJob<T extends PipelineJobItemCont

private final PipelineJobRunnerManager jobRunnerManager;

public AbstractSeparablePipelineJob() {
protected AbstractSeparablePipelineJob() {
this(new PipelineJobRunnerManager());
}

Expand All @@ -57,8 +63,13 @@ public final void execute(final ShardingContext shardingContext) {
log.info("Stopping true, ignore");
return;
}
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(jobType);
PipelineJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
TransmissionJobItemProgress jobItemProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem).orElse(null);
try {
execute(buildJobItemContext(shardingContext));
execute(buildJobItemContext(jobConfig, shardingItem, jobItemProgress));
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
Expand All @@ -68,19 +79,19 @@ public final void execute(final ShardingContext shardingContext) {
}

private void execute(final T jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
if (!jobRunnerManager.addTasksRunner(shardingItem, tasksRunner)) {
return;
}
String jobId = jobItemContext.getJobId();
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId, shardingItem);
prepare(jobItemContext);
log.info("Start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
tasksRunner.start();
}

protected abstract T buildJobItemContext(ShardingContext shardingContext);
protected abstract T buildJobItemContext(PipelineJobConfiguration jobConfig, int shardingItem, PipelineJobItemProgress jobItemProgress);

protected abstract PipelineTasksRunner buildTasksRunner(T jobItemContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,22 @@

import org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.swapper.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.ConsistencyCheckTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;

import java.util.Optional;

/**
* Consistency check job.
*/
public final class ConsistencyCheckJob extends AbstractSeparablePipelineJob<ConsistencyCheckJobItemContext> {

@Override
public ConsistencyCheckJobItemContext buildJobItemContext(final ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager = new PipelineJobItemManager<>(new ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> jobItemProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingContext.getShardingItem());
return new ConsistencyCheckJobItemContext(jobConfig, shardingContext.getShardingItem(), JobStatus.RUNNING, jobItemProgress.orElse(null));
public ConsistencyCheckJobItemContext buildJobItemContext(final PipelineJobConfiguration jobConfig, final int shardingItem, final PipelineJobItemProgress jobItemProgress) {
return new ConsistencyCheckJobItemContext((ConsistencyCheckJobConfiguration) jobConfig, shardingItem, JobStatus.RUNNING, (ConsistencyCheckJobItemProgress) jobItemProgress);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
Expand All @@ -41,19 +42,16 @@
import org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
import org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;

import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -63,22 +61,18 @@
@Slf4j
public final class MigrationJob extends AbstractSeparablePipelineJob<MigrationJobItemContext> {

private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(new MigrationJobType().getYamlJobItemProgressSwapper());

private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();

private final MigrationJobPreparer jobPreparer = new MigrationJobPreparer();

@Override
protected MigrationJobItemContext buildJobItemContext(final ShardingContext shardingContext) {
MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
int shardingItem = shardingContext.getShardingItem();
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
protected MigrationJobItemContext buildJobItemContext(final PipelineJobConfiguration jobConfig, final int shardingItem, final PipelineJobItemProgress jobItemProgress) {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), "MIGRATION"));
TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
MigrationTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager());
MigrationTaskConfiguration taskConfig = buildTaskConfiguration((MigrationJobConfiguration) jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext((MigrationJobConfiguration) jobConfig,
shardingItem, (TransmissionJobItemProgress) jobItemProgress, jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager());
}

private MigrationTaskConfiguration buildTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.config.YamlConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.swapper.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
Expand All @@ -36,6 +40,7 @@

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand All @@ -55,8 +60,10 @@ void assertBuildPipelineJobItemContext() {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId, 0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
ConsistencyCheckJobItemContext actual = consistencyCheckJob.buildJobItemContext(
new ShardingContext(checkJobId, "", 1, YamlEngine.marshal(createYamlConsistencyCheckJobConfiguration(checkJobId)), 0, ""));
ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager = new PipelineJobItemManager<>(new ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> jobItemProgress = jobItemManager.getProgress(jobConfig.getJobId(), 0);
ConsistencyCheckJobItemContext actual = consistencyCheckJob.buildJobItemContext(jobConfig, 0, jobItemProgress.orElse(null));
assertThat(actual.getProgressContext().getSourceTableCheckPositions(), is(expectTableCheckPosition));
assertThat(actual.getProgressContext().getTargetTableCheckPositions(), is(expectTableCheckPosition));
}
Expand Down

0 comments on commit 5142e6e

Please sign in to comment.