Skip to content

Commit

Permalink
Rename IncrementalTaskPositionManager (#29476)
Browse files Browse the repository at this point in the history
* Rename DataSourceCheckEngine.checkSourceDataSources()

* Refactor DataSourceCheckEngine

* Rename IncrementalTaskPreparer

* Rename IncrementalTaskPositionManager

* Rename IncrementalTaskPositionManager
  • Loading branch information
terrymanu authored Dec 20, 2023
1 parent 8d3a882 commit acca6b7
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.preparer;
package org.apache.shardingsphere.data.pipeline.core.preparer.incremental;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
Expand All @@ -34,67 +33,75 @@
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Optional;

/**
* Pipeline job preparer.
* Incremental task position manager.
*/
@RequiredArgsConstructor
@Slf4j
public final class PipelineJobPreparer {
public final class IncrementalTaskPositionManager {

private final DatabaseType databaseType;

private final PositionInitializer positionInitializer;

public IncrementalTaskPositionManager(final DatabaseType databaseType) {
this.databaseType = databaseType;
positionInitializer = DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType);
}

/**
* Get incremental position.
* Get ingest position.
*
* @param initialProgress initial iob item incremental tasks progress
* @param dumperContext incremental dumper context
* @param dataSourceManager data source manager
* @param dataSourceManager pipeline data source manager
* @return ingest position
* @throws SQLException SQL exception
*/
public IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initialProgress, final IncrementalDumperContext dumperContext,
final PipelineDataSourceManager dataSourceManager) throws SQLException {
public IngestPosition getPosition(final JobItemIncrementalTasksProgress initialProgress,
final IncrementalDumperContext dumperContext, final PipelineDataSourceManager dataSourceManager) throws SQLException {
if (null != initialProgress) {
Optional<IngestPosition> position = initialProgress.getIncrementalPosition();
if (position.isPresent()) {
return position.get();
}
}
DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperContext.getJobId());
return positionInitializer.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()), dumperContext.getJobId());
}

/**
* Cleanup job preparer.
* Destroy ingest position.
*
* @param jobId pipeline job id
* @param pipelineDataSourceConfig pipeline data source config
* @param pipelineDataSourceConfig pipeline data source configuration
* @throws SQLException SQL exception
*/
public void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
PositionInitializer positionInitializer = DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType);
final long startTimeMillis = System.currentTimeMillis();
log.info("Cleanup database type:{}, data source type:{}", databaseType.getType(), pipelineDataSourceConfig.getType());
log.info("Cleanup position, database type: {}, pipeline data source type: {}", databaseType.getType(), pipelineDataSourceConfig.getType());
if (pipelineDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig;
for (DataSourcePoolProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(dataSourceConfig.getRootConfig()).values()) {
try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
positionInitializer.destroy(dataSource, jobId);
}
}
destroyPosition(jobId, (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig, positionInitializer);
} else if (pipelineDataSourceConfig instanceof StandardPipelineDataSourceConfiguration) {
destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig, positionInitializer);
}
if (pipelineDataSourceConfig instanceof StandardPipelineDataSourceConfiguration) {
StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig;
try (
PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(
DataSourcePoolCreator.create((DataSourcePoolProperties) dataSourceConfig.getDataSourceConfiguration()), databaseType)) {
log.info("destroyPosition cost {} ms", System.currentTimeMillis() - startTimeMillis);
}

private void destroyPosition(final String jobId, final ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig, final PositionInitializer positionInitializer) throws SQLException {
for (DataSourcePoolProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values()) {
try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
positionInitializer.destroy(dataSource, jobId);
}
}
log.info("destroyPosition cost {} ms", System.currentTimeMillis() - startTimeMillis);
}

private void destroyPosition(final String jobId, final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig, final PositionInitializer positionInitializer) throws SQLException {
try (
PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(
DataSourcePoolCreator.create((DataSourcePoolProperties) pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
positionInitializer.destroy(dataSource, jobId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
Expand Down Expand Up @@ -201,8 +201,8 @@ private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobC
TransmissionJobItemProgress result = new TransmissionJobItemProgress();
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(new PipelineJobPreparer(
incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getIncrementalPosition(null, incrementalDumperContext, dataSourceManager));
IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(new IncrementalTaskPositionManager(
incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getPosition(null, incrementalDumperContext, dataSourceManager));
result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress));
return result;
}
Expand Down Expand Up @@ -259,7 +259,7 @@ private void cleanup(final CDCJobConfiguration jobConfig) {
for (Entry<String, Map<String, Object>> entry : jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
try {
StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(entry.getValue());
new PipelineJobPreparer(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(), pipelineDataSourceConfig);
new IncrementalTaskPositionManager(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(), pipelineDataSourceConfig);
} catch (final SQLException ex) {
log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterType;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
Expand All @@ -41,7 +40,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
import org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
Expand Down Expand Up @@ -106,7 +105,7 @@ private void initIncrementalPosition(final CDCJobItemContext jobItemContext) {
JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
try {
DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
IngestPosition position = new PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
taskConfig.getDumperContext().getCommonContext().setPosition(position);
} catch (final SQLException ex) {
throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
Expand All @@ -128,7 +127,7 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At
Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS, jobItemContext.getSink(),
needSorting(ImporterType.INVENTORY, hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
needSorting(hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
importerConfig.getRateLimitAlgorithm());
jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
Expand All @@ -139,8 +138,8 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At
log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() - startTimeMillis);
}

private boolean needSorting(final ImporterType importerType, final boolean hasGlobalCSN) {
return ImporterType.INCREMENTAL == importerType && hasGlobalCSN;
private boolean needSorting(final boolean hasGlobalCSN) {
return hasGlobalCSN;
}

private boolean hasGlobalCSN(final DatabaseType databaseType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
Expand Down Expand Up @@ -185,7 +185,7 @@ private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
try {
DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
IngestPosition position = new PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
taskConfig.getDumperContext().getCommonContext().setPosition(position);
} catch (final SQLException ex) {
throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
Expand Down Expand Up @@ -232,7 +232,7 @@ private Collection<Importer> createImporters(final ImporterConfiguration importe
public void cleanup(final MigrationJobConfiguration jobConfig) {
for (Entry<String, PipelineDataSourceConfiguration> entry : jobConfig.getSources().entrySet()) {
try {
new PipelineJobPreparer(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(), entry.getValue());
new IncrementalTaskPositionManager(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(), entry.getValue());
} catch (final SQLException ex) {
log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex);
}
Expand Down

0 comments on commit acca6b7

Please sign in to comment.