Skip to content

Commit

Permalink
Rename DataSourceCheckEngine.checkSourceDataSources() (#29475)
Browse files Browse the repository at this point in the history
* Rename DataSourceCheckEngine.checkSourceDataSources()

* Refactor DataSourceCheckEngine
  • Loading branch information
terrymanu authored Dec 20, 2023
1 parent 1cd2459 commit 8d3a882
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
Expand Down Expand Up @@ -68,7 +68,7 @@ public void checkConnection(final Collection<DataSource> dataSources) {
*
* @param dataSources to be checked source data source
*/
public void checkSourceDataSource(final Collection<DataSource> dataSources) {
public void checkSourceDataSources(final Collection<DataSource> dataSources) {
checkConnection(dataSources);
if (null == checker) {
return;
Expand All @@ -85,25 +85,31 @@ public void checkSourceDataSource(final Collection<DataSource> dataSources) {
*/
public void checkTargetDataSources(final Collection<DataSource> dataSources, final ImporterConfiguration importerConfig) {
checkConnection(dataSources);
checkEmptyTable(dataSources, importerConfig.getTableAndSchemaNameMapper(), importerConfig.getLogicTableNames());
checkEmptyTable(dataSources, importerConfig);
}

// TODO Merge schemaName and tableNames
private void checkEmptyTable(final Collection<DataSource> dataSources, final TableAndSchemaNameMapper tableAndSchemaNameMapper, final Collection<String> logicTableNames) {
private void checkEmptyTable(final Collection<DataSource> dataSources, final ImporterConfiguration importerConfig) {
try {
for (DataSource each : dataSources) {
for (String tableName : logicTableNames) {
ShardingSpherePreconditions.checkState(checkEmptyTable(each, tableAndSchemaNameMapper.getSchemaName(tableName), tableName),
() -> new PrepareJobWithTargetTableNotEmptyException(tableName));
for (CaseInsensitiveQualifiedTable qualifiedTable : importerConfig.getQualifiedTables()) {
ShardingSpherePreconditions.checkState(checkEmptyTable(each, qualifiedTable), () -> new PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName().toString()));
}
}
} catch (final SQLException ex) {
throw new PrepareJobWithInvalidConnectionException(ex);
}
}

private boolean checkEmptyTable(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
String sql = sqlBuilder.buildCheckEmptySQL(schemaName, tableName);
/**
* Check whether empty table.
*
* @param dataSource data source
* @param qualifiedTable qualified table
* @return empty or not
* @throws SQLException if there's database operation failure
*/
public boolean checkEmptyTable(final DataSource dataSource, final CaseInsensitiveQualifiedTable qualifiedTable) throws SQLException {
String sql = sqlBuilder.buildCheckEmptySQL(qualifiedTable.getSchemaName().toString(), qualifiedTable.getTableName().toString());
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;

Expand Down Expand Up @@ -86,4 +87,13 @@ public Optional<String> findSchemaName(final String logicTableName) {
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(dataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
return dialectDatabaseMetaData.isSchemaAvailable() ? Optional.of(tableAndSchemaNameMapper.getSchemaName(logicTableName)) : Optional.empty();
}

/**
* Get qualified tables.
*
* @return qualified tables
*/
public Collection<CaseInsensitiveQualifiedTable> getQualifiedTables() {
return getLogicTableNames().stream().map(each -> new CaseInsensitiveQualifiedTable(tableAndSchemaNameMapper.getSchemaName(each), each)).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void prepare(final MigrationJobItemContext jobItemContext) throws SQLExce
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration"));
DatabaseType sourceDatabaseType = jobItemContext.getJobConfig().getSourceDatabaseType();
new DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource()));
new DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
PipelineJobRegistry.stop(jobItemContext.getJobId());
return;
Expand Down

0 comments on commit 8d3a882

Please sign in to comment.