diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java similarity index 56% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java index e2cd721a675db..c935e7fac14b1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java @@ -15,9 +15,8 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.preparer; +package org.apache.shardingsphere.data.pipeline.core.preparer.incremental; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; @@ -34,67 +33,75 @@ import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; -import javax.sql.DataSource; import java.sql.SQLException; import java.util.Optional; /** - * Pipeline job preparer. + * Incremental task position manager. */ -@RequiredArgsConstructor @Slf4j -public final class PipelineJobPreparer { +public final class IncrementalTaskPositionManager { private final DatabaseType databaseType; + private final PositionInitializer positionInitializer; + + public IncrementalTaskPositionManager(final DatabaseType databaseType) { + this.databaseType = databaseType; + positionInitializer = DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType); + } + /** - * Get incremental position. + * Get ingest position. * * @param initialProgress initial iob item incremental tasks progress * @param dumperContext incremental dumper context - * @param dataSourceManager data source manager + * @param dataSourceManager pipeline data source manager * @return ingest position * @throws SQLException SQL exception */ - public IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initialProgress, final IncrementalDumperContext dumperContext, - final PipelineDataSourceManager dataSourceManager) throws SQLException { + public IngestPosition getPosition(final JobItemIncrementalTasksProgress initialProgress, + final IncrementalDumperContext dumperContext, final PipelineDataSourceManager dataSourceManager) throws SQLException { if (null != initialProgress) { Optional position = initialProgress.getIncrementalPosition(); if (position.isPresent()) { return position.get(); } } - DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()); - return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperContext.getJobId()); + return positionInitializer.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()), dumperContext.getJobId()); } /** - * Cleanup job preparer. + * Destroy ingest position. * * @param jobId pipeline job id - * @param pipelineDataSourceConfig pipeline data source config + * @param pipelineDataSourceConfig pipeline data source configuration * @throws SQLException SQL exception */ public void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException { - PositionInitializer positionInitializer = DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType); final long startTimeMillis = System.currentTimeMillis(); - log.info("Cleanup database type:{}, data source type:{}", databaseType.getType(), pipelineDataSourceConfig.getType()); + log.info("Cleanup position, database type: {}, pipeline data source type: {}", databaseType.getType(), pipelineDataSourceConfig.getType()); if (pipelineDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) { - ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig; - for (DataSourcePoolProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(dataSourceConfig.getRootConfig()).values()) { - try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) { - positionInitializer.destroy(dataSource, jobId); - } - } + destroyPosition(jobId, (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig, positionInitializer); + } else if (pipelineDataSourceConfig instanceof StandardPipelineDataSourceConfiguration) { + destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig, positionInitializer); } - if (pipelineDataSourceConfig instanceof StandardPipelineDataSourceConfiguration) { - StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig; - try ( - PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper( - DataSourcePoolCreator.create((DataSourcePoolProperties) dataSourceConfig.getDataSourceConfiguration()), databaseType)) { + log.info("destroyPosition cost {} ms", System.currentTimeMillis() - startTimeMillis); + } + + private void destroyPosition(final String jobId, final ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig, final PositionInitializer positionInitializer) throws SQLException { + for (DataSourcePoolProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values()) { + try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) { positionInitializer.destroy(dataSource, jobId); } } - log.info("destroyPosition cost {} ms", System.currentTimeMillis() - startTimeMillis); + } + + private void destroyPosition(final String jobId, final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig, final PositionInitializer positionInitializer) throws SQLException { + try ( + PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper( + DataSourcePoolCreator.create((DataSourcePoolProperties) pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) { + positionInitializer.destroy(dataSource, jobId); + } } } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java index a177080d86715..59b51ab86c9f9 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java @@ -52,7 +52,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer; +import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager; import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade; import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; @@ -201,8 +201,8 @@ private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobC TransmissionJobItemProgress result = new TransmissionJobItemProgress(); result.setSourceDatabaseType(jobConfig.getSourceDatabaseType()); result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName()); - IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(new PipelineJobPreparer( - incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getIncrementalPosition(null, incrementalDumperContext, dataSourceManager)); + IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(new IncrementalTaskPositionManager( + incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getPosition(null, incrementalDumperContext, dataSourceManager)); result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress)); return result; } @@ -259,7 +259,7 @@ private void cleanup(final CDCJobConfiguration jobConfig) { for (Entry> entry : jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) { try { StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(entry.getValue()); - new PipelineJobPreparer(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(), pipelineDataSourceConfig); + new IncrementalTaskPositionManager(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(), pipelineDataSourceConfig); } catch (final SQLException ex) { log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index c9da76d64ba28..51f717ec8bfb2 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -29,7 +29,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException; import org.apache.shardingsphere.data.pipeline.core.importer.Importer; import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration; -import org.apache.shardingsphere.data.pipeline.core.importer.ImporterType; import org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper; @@ -41,7 +40,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress; import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; -import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer; +import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager; import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter; import org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; @@ -106,7 +105,7 @@ private void initIncrementalPosition(final CDCJobItemContext jobItemContext) { JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental(); try { DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType(); - IngestPosition position = new PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()); + IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()); taskConfig.getDumperContext().getCommonContext().setPosition(position); } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex); @@ -128,7 +127,7 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader()); Importer importer = importerUsed.get() ? null : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS, jobItemContext.getSink(), - needSorting(ImporterType.INVENTORY, hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())), + needSorting(hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())), importerConfig.getRateLimitAlgorithm()); jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(), processContext.getInventoryImporterExecuteEngine(), dumper, importer, position)); @@ -139,8 +138,8 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() - startTimeMillis); } - private boolean needSorting(final ImporterType importerType, final boolean hasGlobalCSN) { - return ImporterType.INCREMENTAL == importerType && hasGlobalCSN; + private boolean needSorting(final boolean hasGlobalCSN) { + return hasGlobalCSN; } private boolean hasGlobalCSN(final DatabaseType databaseType) { diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java index c30eeb3aab52b..edb1755ac8724 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java @@ -46,7 +46,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; -import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer; +import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration; @@ -185,7 +185,7 @@ private void prepareIncremental(final MigrationJobItemContext jobItemContext) { JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental(); try { DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType(); - IngestPosition position = new PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()); + IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()); taskConfig.getDumperContext().getCommonContext().setPosition(position); } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex); @@ -232,7 +232,7 @@ private Collection createImporters(final ImporterConfiguration importe public void cleanup(final MigrationJobConfiguration jobConfig) { for (Entry entry : jobConfig.getSources().entrySet()) { try { - new PipelineJobPreparer(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(), entry.getValue()); + new IncrementalTaskPositionManager(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(), entry.getValue()); } catch (final SQLException ex) { log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex); }