diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java similarity index 83% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java index 6b8565fb5dd6b..89dbd1d2b7577 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java @@ -15,13 +15,12 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; +package org.apache.shardingsphere.data.pipeline.core.checker; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException; -import org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; @@ -37,13 +36,13 @@ */ public final class DataSourceCheckEngine { - private final DatabaseType databaseType; + private final DialectDataSourceChecker checker; - private final DialectDataSourceChecker dialectDataSourceChecker; + private final PipelineCommonSQLBuilder sqlBuilder; public DataSourceCheckEngine(final DatabaseType databaseType) { - this.databaseType = databaseType; - dialectDataSourceChecker = DatabaseTypedSPILoader.findService(DialectDataSourceChecker.class, databaseType).orElse(null); + checker = DatabaseTypedSPILoader.findService(DialectDataSourceChecker.class, databaseType).orElse(null); + sqlBuilder = new PipelineCommonSQLBuilder(databaseType); } /** @@ -87,8 +86,7 @@ public void checkTargetTable(final Collection dataSources, } private boolean checkEmpty(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException { - PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(databaseType); - String sql = pipelineSQLBuilder.buildCheckEmptySQL(schemaName, tableName); + String sql = sqlBuilder.buildCheckEmptySQL(schemaName, tableName); try ( Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql); @@ -103,11 +101,11 @@ private boolean checkEmpty(final DataSource dataSource, final String schemaName, * @param dataSources data sources */ public void checkPrivilege(final Collection dataSources) { - if (null == dialectDataSourceChecker) { + if (null == checker) { return; } for (DataSource each : dataSources) { - dialectDataSourceChecker.checkPrivilege(each); + checker.checkPrivilege(each); } } @@ -117,11 +115,11 @@ public void checkPrivilege(final Collection dataSources) { * @param dataSources data sources */ public void checkVariable(final Collection dataSources) { - if (null == dialectDataSourceChecker) { + if (null == checker) { return; } for (DataSource each : dataSources) { - dialectDataSourceChecker.checkVariable(each); + checker.checkVariable(each); } } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/datasource/DialectDataSourceChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectDataSourceChecker.java similarity index 95% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/datasource/DialectDataSourceChecker.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectDataSourceChecker.java index 0f9bca5cea9b8..946f6b53e712f 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/datasource/DialectDataSourceChecker.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectDataSourceChecker.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.spi.datasource; +package org.apache.shardingsphere.data.pipeline.core.checker; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java similarity index 71% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java index 8be1d3e1e3d2c..51d3a5e17d17b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java @@ -17,21 +17,20 @@ package org.apache.shardingsphere.data.pipeline.core.preparer; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourceCheckEngine; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator; @@ -41,7 +40,6 @@ import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator; import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; -import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import org.apache.shardingsphere.infra.parser.SQLParserEngine; import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; import org.apache.shardingsphere.parser.rule.SQLParserRule; @@ -52,60 +50,56 @@ import java.util.Optional; /** - * Pipeline job preparer utility class. + * Pipeline job preparer. */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) +@RequiredArgsConstructor @Slf4j -public final class PipelineJobPreparerUtils { +public final class PipelineJobPreparer { + + private final DatabaseType databaseType; /** * Is incremental supported. * - * @param databaseType database type - * @return true if supported, otherwise false + * @return support incremental or not */ - public static boolean isIncrementalSupported(final DatabaseType databaseType) { + public boolean isIncrementalSupported() { return DatabaseTypedSPILoader.findService(IncrementalDumperCreator.class, databaseType).map(IncrementalDumperCreator::isSupportIncrementalDump).orElse(false); } /** * Prepare target schema. * - * @param databaseType database type - * @param prepareTargetSchemasParam prepare target schemas parameter + * @param param prepare target schemas parameter * @throws SQLException if prepare target schema fail */ - public static void prepareTargetSchema(final DatabaseType databaseType, final PrepareTargetSchemasParameter prepareTargetSchemasParam) throws SQLException { - DataSourcePrepareOption option = DatabaseTypedSPILoader.findService(DataSourcePrepareOption.class, databaseType) - .orElseGet(() -> DatabaseTypedSPILoader.getService(DataSourcePrepareOption.class, null)); - new DataSourcePreparer(option).prepareTargetSchemas(prepareTargetSchemasParam); + public void prepareTargetSchema(final PrepareTargetSchemasParameter param) throws SQLException { + DialectPipelineJobDataSourcePrepareOption option = DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class, databaseType) + .orElseGet(() -> DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class, null)); + new PipelineJobDataSourcePreparer(option).prepareTargetSchemas(param); } /** * Get SQL parser engine. * * @param metaData meta data - * @param targetDatabaseName target database name * @return SQL parser engine */ - public static SQLParserEngine getSQLParserEngine(final ShardingSphereMetaData metaData, final String targetDatabaseName) { - ShardingSphereDatabase database = metaData.getDatabase(targetDatabaseName); - DatabaseType databaseType = database.getProtocolType().getTrunkDatabaseType().orElse(database.getProtocolType()); + public SQLParserEngine getSQLParserEngine(final ShardingSphereMetaData metaData) { return metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(databaseType); } /** * Prepare target tables. * - * @param databaseType database type * @param prepareTargetTablesParam prepare target tables parameter * @throws SQLException SQL exception */ - public static void prepareTargetTables(final DatabaseType databaseType, final PrepareTargetTablesParameter prepareTargetTablesParam) throws SQLException { - DataSourcePrepareOption option = DatabaseTypedSPILoader.findService(DataSourcePrepareOption.class, databaseType) - .orElseGet(() -> DatabaseTypedSPILoader.getService(DataSourcePrepareOption.class, null)); + public void prepareTargetTables(final PrepareTargetTablesParameter prepareTargetTablesParam) throws SQLException { + DialectPipelineJobDataSourcePrepareOption option = DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class, databaseType) + .orElseGet(() -> DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class, null)); long startTimeMillis = System.currentTimeMillis(); - new DataSourcePreparer(option).prepareTargetTables(prepareTargetTablesParam); + new PipelineJobDataSourcePreparer(option).prepareTargetTables(prepareTargetTablesParam); log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() - startTimeMillis); } @@ -118,15 +112,14 @@ public static void prepareTargetTables(final DatabaseType databaseType, final Pr * @return ingest position * @throws SQLException sql exception */ - public static IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperContext dumperContext, - final PipelineDataSourceManager dataSourceManager) throws SQLException { + public IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperContext dumperContext, + final PipelineDataSourceManager dataSourceManager) throws SQLException { if (null != initIncremental) { Optional position = initIncremental.getIncrementalPosition(); if (position.isPresent()) { return position.get(); } } - DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType(); DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()); return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperContext.getJobId()); } @@ -134,10 +127,9 @@ public static IngestPosition getIncrementalPosition(final JobItemIncrementalTask /** * Check data source. * - * @param databaseType database type * @param dataSources data source */ - public static void checkSourceDataSource(final DatabaseType databaseType, final Collection dataSources) { + public void checkSourceDataSource(final Collection dataSources) { if (dataSources.isEmpty()) { return; } @@ -150,11 +142,10 @@ public static void checkSourceDataSource(final DatabaseType databaseType, final /** * Check target data source. * - * @param databaseType database type * @param importerConfig importer config * @param targetDataSources target data sources */ - public static void checkTargetDataSource(final DatabaseType databaseType, final ImporterConfiguration importerConfig, final Collection targetDataSources) { + public void checkTargetDataSource(final ImporterConfiguration importerConfig, final Collection targetDataSources) { if (null == targetDataSources || targetDataSources.isEmpty()) { log.info("target data source is empty, skip check"); return; @@ -167,12 +158,11 @@ public static void checkTargetDataSource(final DatabaseType databaseType, final /** * Cleanup job preparer. * - * @param jobId job id + * @param jobId pipeline job id * @param pipelineDataSourceConfig pipeline data source config - * @throws SQLException sql exception + * @throws SQLException SQL exception */ - public static void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException { - DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType(); + public void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException { PositionInitializer positionInitializer = DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType); final long startTimeMillis = System.currentTimeMillis(); log.info("Cleanup database type:{}, data source type:{}", databaseType.getType(), pipelineDataSourceConfig.getType()); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java similarity index 95% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java index d738edeba8e28..3f9ae7a8bc0bc 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java @@ -23,7 +23,8 @@ import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator; -import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder; @@ -42,17 +43,17 @@ import java.util.regex.Pattern; /** - * Data source preparer. + * Pipeline job data source preparer. */ @RequiredArgsConstructor @Slf4j -public final class DataSourcePreparer { +public final class PipelineJobDataSourcePreparer { private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", Pattern.CASE_INSENSITIVE); private static final Pattern PATTERN_CREATE_TABLE = Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE); - private final DataSourcePrepareOption option; + private final DialectPipelineJobDataSourcePrepareOption option; /** * Prepare target schemas. diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DefaultDataSourcePrepareOption.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DefaultPipelineJobDataSourcePrepareOption.java similarity index 66% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DefaultDataSourcePrepareOption.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DefaultPipelineJobDataSourcePrepareOption.java index fd22b23e244a5..34a42ed258e0a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DefaultDataSourcePrepareOption.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DefaultPipelineJobDataSourcePrepareOption.java @@ -15,12 +15,25 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; +package org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option; + +import java.util.Collection; +import java.util.Collections; /** - * Default data source prepare option. + * Default pipeline job data source prepare option. */ -public final class DefaultDataSourcePrepareOption implements DataSourcePrepareOption { +public final class DefaultPipelineJobDataSourcePrepareOption implements DialectPipelineJobDataSourcePrepareOption { + + @Override + public boolean isSupportIfNotExistsOnCreateSchema() { + return true; + } + + @Override + public Collection getIgnoredExceptionMessages() { + return Collections.emptyList(); + } @Override public boolean isDefault() { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DialectPipelineJobDataSourcePrepareOption.java similarity index 79% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DialectPipelineJobDataSourcePrepareOption.java index 52bc1962bcd69..70a0d7944ac0c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DialectPipelineJobDataSourcePrepareOption.java @@ -15,35 +15,30 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; +package org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; import java.util.Collection; -import java.util.Collections; /** - * Data source prepare option. + * Dialect pipeline job data source prepare option. */ @SingletonSPI -public interface DataSourcePrepareOption extends DatabaseTypedSPI { +public interface DialectPipelineJobDataSourcePrepareOption extends DatabaseTypedSPI { /** * Is support if not exists on create schema SQL. * * @return supported or not */ - default boolean isSupportIfNotExistsOnCreateSchema() { - return true; - } + boolean isSupportIfNotExistsOnCreateSchema(); /** * Get ignored exception messages. * * @return ignored exception messages */ - default Collection getIgnoredExceptionMessages() { - return Collections.emptyList(); - } + Collection getIgnoredExceptionMessages(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/CreateTableConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/CreateTableConfiguration.java similarity index 90% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/CreateTableConfiguration.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/CreateTableConfiguration.java index 2f7d284e10308..737e16159bfeb 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/CreateTableConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/CreateTableConfiguration.java @@ -15,11 +15,10 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.preparer; +package org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param; import lombok.Getter; import lombok.RequiredArgsConstructor; -import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable; @@ -28,7 +27,6 @@ */ @RequiredArgsConstructor @Getter -@ToString(exclude = {"sourceDataSourceConfig", "targetDataSourceConfig"}) public final class CreateTableConfiguration { private final PipelineDataSourceConfiguration sourceDataSourceConfig; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java index 7b4cf8e8d3621..a4ae2cc394286 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java @@ -19,7 +19,6 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java index 348c931eb0ac7..2648729988698 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java @@ -19,7 +19,6 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.infra.parser.SQLParserEngine; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java similarity index 98% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java index f910490fe565a..894d5d55f91ff 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.preparer; +package org.apache.shardingsphere.data.pipeline.core.preparer.inventory; import lombok.AccessLevel; import lombok.NoArgsConstructor; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java similarity index 99% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java index 74d1bf53d049d..95d9c9e4eaa3f 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.preparer; +package org.apache.shardingsphere.data.pipeline.core.preparer.inventory; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicReference; /** - * Inventory data task splitter. + * Inventory task splitter. */ @RequiredArgsConstructor @Slf4j diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption similarity index 94% rename from kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption rename to kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption index 6acc331887724..9a154a9272ce5 100644 --- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption +++ b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption @@ -15,4 +15,4 @@ # limitations under the License. # -org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DefaultDataSourcePrepareOption +org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DefaultPipelineJobDataSourcePrepareOption diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java index 3f8f36c97a81d..f117cf933dd5c 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; +import org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException; diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java index 2bc39c2ecd4a7..36132ba276933 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java @@ -20,7 +20,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithCheckPrivilegeFailedException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidSourceDataSourceException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException; -import org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker; +import org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import javax.sql.DataSource; diff --git a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker similarity index 100% rename from kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker rename to kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java index eae8d6e6672cd..f9819c42f4614 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java @@ -20,7 +20,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithCheckPrivilegeFailedException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutUserException; -import org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker; +import org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import javax.sql.DataSource; diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussPipelineJobDataSourcePrepareOption.java similarity index 85% rename from kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java rename to kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussPipelineJobDataSourcePrepareOption.java index 846a041b4d25e..2771c5d935372 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussPipelineJobDataSourcePrepareOption.java @@ -18,16 +18,16 @@ package org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption; import java.util.Arrays; import java.util.Collection; /** - * Data source prepare option for openGauss. + * Pipeline job data source prepare option for openGauss. */ @Slf4j -public final class OpenGaussDataSourcePrepareOption implements DataSourcePrepareOption { +public final class OpenGaussPipelineJobDataSourcePrepareOption implements DialectPipelineJobDataSourcePrepareOption { @Override public boolean isSupportIfNotExistsOnCreateSchema() { diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker similarity index 100% rename from kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker rename to kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption similarity index 94% rename from kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption rename to kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption index 72923bbe6654b..e1fd88a4da647 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption +++ b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption @@ -15,4 +15,4 @@ # limitations under the License. # -org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource.OpenGaussDataSourcePrepareOption +org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource.OpenGaussPipelineJobDataSourcePrepareOption diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java index c8e0014666341..693afc16076d8 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java @@ -21,7 +21,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithCheckPrivilegeFailedException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutUserException; -import org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker; +import org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import javax.sql.DataSource; diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker similarity index 100% rename from kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker rename to kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java index 8474fcfe2e9b9..04ef66dcac762 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java @@ -52,7 +52,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils; +import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer; import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade; import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; @@ -202,7 +202,8 @@ private static TransmissionJobItemProgress getTransmissionJobItemProgress(final TransmissionJobItemProgress result = new TransmissionJobItemProgress(); result.setSourceDatabaseType(jobConfig.getSourceDatabaseType()); result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName()); - IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, incrementalDumperContext, dataSourceManager)); + IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(new PipelineJobPreparer( + incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getIncrementalPosition(null, incrementalDumperContext, dataSourceManager)); result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress)); return result; } @@ -258,7 +259,8 @@ public void drop(final String jobId) { private void cleanup(final CDCJobConfiguration jobConfig) { for (Entry> entry : jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) { try { - PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(), new StandardPipelineDataSourceConfiguration(entry.getValue())); + StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(entry.getValue()); + new PipelineJobPreparer(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(), pipelineDataSourceConfig); } catch (final SQLException ex) { log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index 82e1330c395f5..9b991c3e8e175 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -41,8 +41,8 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress; import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; -import org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter; -import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils; +import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter; +import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer; import org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils; @@ -106,8 +106,8 @@ private void initIncrementalPosition(final CDCJobItemContext jobItemContext) { CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental(); try { - taskConfig.getDumperContext().getCommonContext().setPosition( - PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); + taskConfig.getDumperContext().getCommonContext().setPosition(new PipelineJobPreparer(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType()) + .getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java index af98652a27924..7b276da874920 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java @@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable; -import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; import org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner; @@ -45,7 +45,6 @@ import java.sql.SQLException; import java.util.Collection; -import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -76,11 +75,7 @@ private MigrationTaskConfiguration buildTaskConfiguration(final MigrationJobConf } private Collection buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper mapper) { - Collection result = new LinkedList<>(); - for (JobDataNodeEntry each : jobConfig.getTablesFirstDataNodes().getEntries()) { - result.add(getCreateTableConfiguration(jobConfig, mapper, each)); - } - return result; + return jobConfig.getTablesFirstDataNodes().getEntries().stream().map(each -> getCreateTableConfiguration(jobConfig, mapper, each)).collect(Collectors.toList()); } private CreateTableConfiguration getCreateTableConfiguration(final MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper mapper, final JobDataNodeEntry jobDataNodeEntry) { diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java index 07aba7d6dd91e..d36dd904cdee0 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java index 69c4780ab64cd..cf01823a9774d 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java @@ -20,7 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; @@ -47,8 +47,8 @@ import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; -import org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter; -import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils; +import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter; +import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask; @@ -95,7 +95,8 @@ public void prepare(final MigrationJobItemContext jobItemContext) throws SQLExce ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals( jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration")); - PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(), Collections.singleton(jobItemContext.getSourceDataSource())); + PipelineJobPreparer preparer = new PipelineJobPreparer(jobItemContext.getJobConfig().getSourceDatabaseType()); + preparer.checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource())); if (jobItemContext.isStopping()) { PipelineJobRegistry.stop(jobItemContext.getJobId()); return; @@ -105,7 +106,7 @@ public void prepare(final MigrationJobItemContext jobItemContext) throws SQLExce PipelineJobRegistry.stop(jobItemContext.getJobId()); return; } - boolean isIncrementalSupported = PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()); + boolean isIncrementalSupported = preparer.isIncrementalSupported(); if (isIncrementalSupported) { prepareIncremental(jobItemContext); } @@ -157,8 +158,8 @@ private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext) TransmissionJobItemProgress initProgress = jobItemContext.getInitProgress(); if (null == initProgress) { PipelineDataSourceWrapper targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig()); - PipelineJobPreparerUtils.checkTargetDataSource( - jobItemContext.getJobConfig().getTargetDatabaseType(), jobItemContext.getTaskConfig().getImporterConfig(), Collections.singleton(targetDataSource)); + new PipelineJobPreparer(jobItemContext.getJobConfig().getTargetDatabaseType()).checkTargetDataSource( + jobItemContext.getTaskConfig().getImporterConfig(), Collections.singleton(targetDataSource)); } } @@ -167,18 +168,21 @@ private void prepareTarget(final MigrationJobItemContext jobItemContext) throws Collection createTableConfigs = jobItemContext.getTaskConfig().getCreateTableConfigurations(); PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager(); PrepareTargetSchemasParameter prepareTargetSchemasParam = new PrepareTargetSchemasParameter(jobItemContext.getJobConfig().getTargetDatabaseType(), createTableConfigs, dataSourceManager); - PipelineJobPreparerUtils.prepareTargetSchema(jobItemContext.getJobConfig().getTargetDatabaseType(), prepareTargetSchemasParam); + PipelineJobPreparer targetDataSourcePreparer = new PipelineJobPreparer(jobItemContext.getJobConfig().getTargetDatabaseType()); + targetDataSourcePreparer.prepareTargetSchema(prepareTargetSchemasParam); ShardingSphereMetaData metaData = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData(); - SQLParserEngine sqlParserEngine = PipelineJobPreparerUtils.getSQLParserEngine(metaData, jobConfig.getTargetDatabaseName()); - PipelineJobPreparerUtils.prepareTargetTables(jobItemContext.getJobConfig().getTargetDatabaseType(), new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine)); + SQLParserEngine sqlParserEngine = new PipelineJobPreparer( + metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType().getTrunkDatabaseType().orElse(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType())) + .getSQLParserEngine(metaData); + targetDataSourcePreparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine)); } private void prepareIncremental(final MigrationJobItemContext jobItemContext) { MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental(); try { - taskConfig.getDumperContext().getCommonContext().setPosition( - PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); + taskConfig.getDumperContext().getCommonContext().setPosition(new PipelineJobPreparer(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType()) + .getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex); } @@ -224,7 +228,7 @@ private Collection createImporters(final ImporterConfiguration importe public void cleanup(final MigrationJobConfiguration jobConfig) { for (Entry entry : jobConfig.getSources().entrySet()) { try { - PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(), entry.getValue()); + new PipelineJobPreparer(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(), entry.getValue()); } catch (final SQLException ex) { log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java index e80538695abfc..23018527cf3c1 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.fixture; -import org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker; +import org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker; import javax.sql.DataSource; diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java index 473baa149d580..9581f5452bb49 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java @@ -25,7 +25,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.type.IntegerPrimaryKeyPosition; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader; -import org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter; +import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter; import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext; diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java index 939ff8881a790..970e4ad26253d 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java @@ -38,7 +38,7 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; -import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; diff --git a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker similarity index 100% rename from test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker rename to test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker