Skip to content

Commit

Permalink
Refactor PipelineJobPreparer (#29427)
Browse files Browse the repository at this point in the history
* Refactor DefaultDataSourcePrepareOption

* Rename DialectDataSourcePrepareOption

* Move DialectDataSourcePrepareOption

* Add checker package

* Refactor DataSourceCheckEngine

* Refactor DataSourceCheckEngine

* Refactor InventoryTaskSplitter

* Move InventoryTaskSplitter

* Refactor CreateTableConfiguration

* Rename PipelineJobDataSourcePreparer

* Rename PipelineJobPreparer

* Refactor PipelineJobPreparer

* Refactor PipelineJobPreparer

* Refactor PipelineJobPreparer

* Refactor PipelineJobPreparer
  • Loading branch information
terrymanu authored Dec 17, 2023
1 parent 436684d commit 56a2581
Show file tree
Hide file tree
Showing 30 changed files with 110 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
package org.apache.shardingsphere.data.pipeline.core.checker;

import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;

Expand All @@ -37,13 +36,13 @@
*/
public final class DataSourceCheckEngine {

private final DatabaseType databaseType;
private final DialectDataSourceChecker checker;

private final DialectDataSourceChecker dialectDataSourceChecker;
private final PipelineCommonSQLBuilder sqlBuilder;

public DataSourceCheckEngine(final DatabaseType databaseType) {
this.databaseType = databaseType;
dialectDataSourceChecker = DatabaseTypedSPILoader.findService(DialectDataSourceChecker.class, databaseType).orElse(null);
checker = DatabaseTypedSPILoader.findService(DialectDataSourceChecker.class, databaseType).orElse(null);
sqlBuilder = new PipelineCommonSQLBuilder(databaseType);
}

/**
Expand Down Expand Up @@ -87,8 +86,7 @@ public void checkTargetTable(final Collection<? extends DataSource> dataSources,
}

private boolean checkEmpty(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(databaseType);
String sql = pipelineSQLBuilder.buildCheckEmptySQL(schemaName, tableName);
String sql = sqlBuilder.buildCheckEmptySQL(schemaName, tableName);
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
Expand All @@ -103,11 +101,11 @@ private boolean checkEmpty(final DataSource dataSource, final String schemaName,
* @param dataSources data sources
*/
public void checkPrivilege(final Collection<? extends DataSource> dataSources) {
if (null == dialectDataSourceChecker) {
if (null == checker) {
return;
}
for (DataSource each : dataSources) {
dialectDataSourceChecker.checkPrivilege(each);
checker.checkPrivilege(each);
}
}

Expand All @@ -117,11 +115,11 @@ public void checkPrivilege(final Collection<? extends DataSource> dataSources) {
* @param dataSources data sources
*/
public void checkVariable(final Collection<? extends DataSource> dataSources) {
if (null == dialectDataSourceChecker) {
if (null == checker) {
return;
}
for (DataSource each : dataSources) {
dialectDataSourceChecker.checkVariable(each);
checker.checkVariable(each);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.spi.datasource;
package org.apache.shardingsphere.data.pipeline.core.checker;

import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@

package org.apache.shardingsphere.data.pipeline.core.preparer;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourceCheckEngine;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
Expand All @@ -41,7 +40,6 @@
import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
Expand All @@ -52,60 +50,56 @@
import java.util.Optional;

/**
* Pipeline job preparer utility class.
* Pipeline job preparer.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@RequiredArgsConstructor
@Slf4j
public final class PipelineJobPreparerUtils {
public final class PipelineJobPreparer {

private final DatabaseType databaseType;

/**
* Is incremental supported.
*
* @param databaseType database type
* @return true if supported, otherwise false
* @return support incremental or not
*/
public static boolean isIncrementalSupported(final DatabaseType databaseType) {
public boolean isIncrementalSupported() {
return DatabaseTypedSPILoader.findService(IncrementalDumperCreator.class, databaseType).map(IncrementalDumperCreator::isSupportIncrementalDump).orElse(false);
}

/**
* Prepare target schema.
*
* @param databaseType database type
* @param prepareTargetSchemasParam prepare target schemas parameter
* @param param prepare target schemas parameter
* @throws SQLException if prepare target schema fail
*/
public static void prepareTargetSchema(final DatabaseType databaseType, final PrepareTargetSchemasParameter prepareTargetSchemasParam) throws SQLException {
DataSourcePrepareOption option = DatabaseTypedSPILoader.findService(DataSourcePrepareOption.class, databaseType)
.orElseGet(() -> DatabaseTypedSPILoader.getService(DataSourcePrepareOption.class, null));
new DataSourcePreparer(option).prepareTargetSchemas(prepareTargetSchemasParam);
public void prepareTargetSchema(final PrepareTargetSchemasParameter param) throws SQLException {
DialectPipelineJobDataSourcePrepareOption option = DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class, databaseType)
.orElseGet(() -> DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class, null));
new PipelineJobDataSourcePreparer(option).prepareTargetSchemas(param);
}

/**
* Get SQL parser engine.
*
* @param metaData meta data
* @param targetDatabaseName target database name
* @return SQL parser engine
*/
public static SQLParserEngine getSQLParserEngine(final ShardingSphereMetaData metaData, final String targetDatabaseName) {
ShardingSphereDatabase database = metaData.getDatabase(targetDatabaseName);
DatabaseType databaseType = database.getProtocolType().getTrunkDatabaseType().orElse(database.getProtocolType());
public SQLParserEngine getSQLParserEngine(final ShardingSphereMetaData metaData) {
return metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(databaseType);
}

/**
* Prepare target tables.
*
* @param databaseType database type
* @param prepareTargetTablesParam prepare target tables parameter
* @throws SQLException SQL exception
*/
public static void prepareTargetTables(final DatabaseType databaseType, final PrepareTargetTablesParameter prepareTargetTablesParam) throws SQLException {
DataSourcePrepareOption option = DatabaseTypedSPILoader.findService(DataSourcePrepareOption.class, databaseType)
.orElseGet(() -> DatabaseTypedSPILoader.getService(DataSourcePrepareOption.class, null));
public void prepareTargetTables(final PrepareTargetTablesParameter prepareTargetTablesParam) throws SQLException {
DialectPipelineJobDataSourcePrepareOption option = DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class, databaseType)
.orElseGet(() -> DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class, null));
long startTimeMillis = System.currentTimeMillis();
new DataSourcePreparer(option).prepareTargetTables(prepareTargetTablesParam);
new PipelineJobDataSourcePreparer(option).prepareTargetTables(prepareTargetTablesParam);
log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() - startTimeMillis);
}

Expand All @@ -118,26 +112,24 @@ public static void prepareTargetTables(final DatabaseType databaseType, final Pr
* @return ingest position
* @throws SQLException sql exception
*/
public static IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperContext dumperContext,
final PipelineDataSourceManager dataSourceManager) throws SQLException {
public IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperContext dumperContext,
final PipelineDataSourceManager dataSourceManager) throws SQLException {
if (null != initIncremental) {
Optional<IngestPosition> position = initIncremental.getIncrementalPosition();
if (position.isPresent()) {
return position.get();
}
}
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperContext.getJobId());
}

/**
* Check data source.
*
* @param databaseType database type
* @param dataSources data source
*/
public static void checkSourceDataSource(final DatabaseType databaseType, final Collection<? extends DataSource> dataSources) {
public void checkSourceDataSource(final Collection<? extends DataSource> dataSources) {
if (dataSources.isEmpty()) {
return;
}
Expand All @@ -150,11 +142,10 @@ public static void checkSourceDataSource(final DatabaseType databaseType, final
/**
* Check target data source.
*
* @param databaseType database type
* @param importerConfig importer config
* @param targetDataSources target data sources
*/
public static void checkTargetDataSource(final DatabaseType databaseType, final ImporterConfiguration importerConfig, final Collection<? extends DataSource> targetDataSources) {
public void checkTargetDataSource(final ImporterConfiguration importerConfig, final Collection<? extends DataSource> targetDataSources) {
if (null == targetDataSources || targetDataSources.isEmpty()) {
log.info("target data source is empty, skip check");
return;
Expand All @@ -167,12 +158,11 @@ public static void checkTargetDataSource(final DatabaseType databaseType, final
/**
* Cleanup job preparer.
*
* @param jobId job id
* @param jobId pipeline job id
* @param pipelineDataSourceConfig pipeline data source config
* @throws SQLException sql exception
* @throws SQLException SQL exception
*/
public static void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
public void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
PositionInitializer positionInitializer = DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType);
final long startTimeMillis = System.currentTimeMillis();
log.info("Cleanup database type:{}, data source type:{}", databaseType.getType(), pipelineDataSourceConfig.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
Expand All @@ -42,17 +43,17 @@
import java.util.regex.Pattern;

/**
* Data source preparer.
* Pipeline job data source preparer.
*/
@RequiredArgsConstructor
@Slf4j
public final class DataSourcePreparer {
public final class PipelineJobDataSourcePreparer {

private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", Pattern.CASE_INSENSITIVE);

private static final Pattern PATTERN_CREATE_TABLE = Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);

private final DataSourcePrepareOption option;
private final DialectPipelineJobDataSourcePrepareOption option;

/**
* Prepare target schemas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,25 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option;

import java.util.Collection;
import java.util.Collections;

/**
* Default data source prepare option.
* Default pipeline job data source prepare option.
*/
public final class DefaultDataSourcePrepareOption implements DataSourcePrepareOption {
public final class DefaultPipelineJobDataSourcePrepareOption implements DialectPipelineJobDataSourcePrepareOption {

@Override
public boolean isSupportIfNotExistsOnCreateSchema() {
return true;
}

@Override
public Collection<String> getIgnoredExceptionMessages() {
return Collections.emptyList();
}

@Override
public boolean isDefault() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,30 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option;

import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;

import java.util.Collection;
import java.util.Collections;

/**
* Data source prepare option.
* Dialect pipeline job data source prepare option.
*/
@SingletonSPI
public interface DataSourcePrepareOption extends DatabaseTypedSPI {
public interface DialectPipelineJobDataSourcePrepareOption extends DatabaseTypedSPI {

/**
* Is support if not exists on create schema SQL.
*
* @return supported or not
*/
default boolean isSupportIfNotExistsOnCreateSchema() {
return true;
}
boolean isSupportIfNotExistsOnCreateSchema();

/**
* Get ignored exception messages.
*
* @return ignored exception messages
*/
default Collection<String> getIgnoredExceptionMessages() {
return Collections.emptyList();
}
Collection<String> getIgnoredExceptionMessages();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.preparer;
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;

Expand All @@ -28,7 +27,6 @@
*/
@RequiredArgsConstructor
@Getter
@ToString(exclude = {"sourceDataSourceConfig", "targetDataSourceConfig"})
public final class CreateTableConfiguration {

private final PipelineDataSourceConfiguration sourceDataSourceConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.preparer;
package org.apache.shardingsphere.data.pipeline.core.preparer.inventory;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
Expand Down
Loading

0 comments on commit 56a2581

Please sign in to comment.