Skip to content

Commit

Permalink
Add more generic types of AbstractSeparablePipelineJob (#29366)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 11, 2023
1 parent 5142e6e commit 37fb5a8
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
Expand All @@ -41,12 +40,14 @@
/**
* Abstract separable pipeline job.
*
* @param <T> type of pipeline job item context
* @param <T> type of pipeline job configuration
* @param <I> type of pipeline job item context
* @param <P> type of pipeline job item progress
*/
@RequiredArgsConstructor
@Getter
@Slf4j
public abstract class AbstractSeparablePipelineJob<T extends PipelineJobItemContext> implements PipelineJob {
public abstract class AbstractSeparablePipelineJob<T extends PipelineJobConfiguration, I extends PipelineJobItemContext, P extends PipelineJobItemProgress> implements PipelineJob {

private final PipelineJobRunnerManager jobRunnerManager;

Expand All @@ -65,9 +66,9 @@ public final void execute(final ShardingContext shardingContext) {
}
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(jobType);
PipelineJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
TransmissionJobItemProgress jobItemProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem).orElse(null);
T jobConfig = jobConfigManager.getJobConfiguration(jobId);
PipelineJobItemManager<P> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
P jobItemProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem).orElse(null);
try {
execute(buildJobItemContext(jobConfig, shardingItem, jobItemProgress));
// CHECKSTYLE:OFF
Expand All @@ -78,7 +79,7 @@ public final void execute(final ShardingContext shardingContext) {
}
}

private void execute(final T jobItemContext) {
private void execute(final I jobItemContext) {
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
if (!jobRunnerManager.addTasksRunner(shardingItem, tasksRunner)) {
Expand All @@ -91,11 +92,11 @@ private void execute(final T jobItemContext) {
tasksRunner.start();
}

protected abstract T buildJobItemContext(PipelineJobConfiguration jobConfig, int shardingItem, PipelineJobItemProgress jobItemProgress);
protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P jobItemProgress);

protected abstract PipelineTasksRunner buildTasksRunner(T jobItemContext);
protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);

protected final void prepare(final T jobItemContext) {
protected final void prepare(final I jobItemContext) {
try {
doPrepare(jobItemContext);
// CHECKSTYLE:OFF
Expand All @@ -105,7 +106,7 @@ protected final void prepare(final T jobItemContext) {
}
}

protected abstract void doPrepare(T jobItemContext) throws SQLException;
protected abstract void doPrepare(I jobItemContext) throws SQLException;

private void processFailed(final String jobId, final int shardingItem, final Exception ex) {
log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
Expand All @@ -30,11 +28,11 @@
/**
* Consistency check job.
*/
public final class ConsistencyCheckJob extends AbstractSeparablePipelineJob<ConsistencyCheckJobItemContext> {
public final class ConsistencyCheckJob extends AbstractSeparablePipelineJob<ConsistencyCheckJobConfiguration, ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {

@Override
public ConsistencyCheckJobItemContext buildJobItemContext(final PipelineJobConfiguration jobConfig, final int shardingItem, final PipelineJobItemProgress jobItemProgress) {
return new ConsistencyCheckJobItemContext((ConsistencyCheckJobConfiguration) jobConfig, shardingItem, JobStatus.RUNNING, (ConsistencyCheckJobItemProgress) jobItemProgress);
public ConsistencyCheckJobItemContext buildJobItemContext(final ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final ConsistencyCheckJobItemProgress jobItemProgress) {
return new ConsistencyCheckJobItemContext(jobConfig, shardingItem, JobStatus.RUNNING, jobItemProgress);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
Expand Down Expand Up @@ -59,20 +57,19 @@
* Migration job.
*/
@Slf4j
public final class MigrationJob extends AbstractSeparablePipelineJob<MigrationJobItemContext> {
public final class MigrationJob extends AbstractSeparablePipelineJob<MigrationJobConfiguration, MigrationJobItemContext, TransmissionJobItemProgress> {

private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();

private final MigrationJobPreparer jobPreparer = new MigrationJobPreparer();

@Override
protected MigrationJobItemContext buildJobItemContext(final PipelineJobConfiguration jobConfig, final int shardingItem, final PipelineJobItemProgress jobItemProgress) {
protected MigrationJobItemContext buildJobItemContext(final MigrationJobConfiguration jobConfig, final int shardingItem, final TransmissionJobItemProgress jobItemProgress) {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), "MIGRATION"));
TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
MigrationTaskConfiguration taskConfig = buildTaskConfiguration((MigrationJobConfiguration) jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext((MigrationJobConfiguration) jobConfig,
shardingItem, (TransmissionJobItemProgress) jobItemProgress, jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager());
MigrationTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem, jobItemProgress, jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager());
}

private MigrationTaskConfiguration buildTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
Expand Down

0 comments on commit 37fb5a8

Please sign in to comment.