Skip to content

Commit

Permalink
Move PrepareTargetSchemasParameter and PrepareTargetTablesParameter (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 14, 2023
1 parent 53d1b23 commit d69b3f1
Show file tree
Hide file tree
Showing 11 changed files with 26 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourceCheckEngine;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.core.spi.ingest.position.PositionInitializer;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
Expand Down Expand Up @@ -55,11 +55,10 @@ public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) thro
if (!dialectDatabaseMetaData.isSchemaAvailable()) {
return;
}
Collection<CreateTableConfiguration> createTableConfigs = param.getCreateTableConfigurations();
String defaultSchema = dialectDatabaseMetaData.getDefaultSchema().orElse(null);
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(targetDatabaseType);
Collection<String> createdSchemaNames = new HashSet<>();
for (CreateTableConfiguration each : createTableConfigs) {
for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
String targetSchemaName = each.getTargetName().getSchemaName().toString();
if (null == targetSchemaName || targetSchemaName.equalsIgnoreCase(defaultSchema) || createdSchemaNames.contains(targetSchemaName)) {
continue;
Expand All @@ -73,38 +72,21 @@ public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) thro
}

private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException {
log.info("prepareTargetSchemas, sql={}", sql);
try (Connection connection = getCachedDataSource(dataSourceManager, targetDataSourceConfig).getConnection()) {
try (Statement statement = connection.createStatement()) {
statement.execute(sql);
}
log.info("Prepare target schemas SQL: {}", sql);
try (
Connection connection = dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
}
}

protected final PipelineDataSourceWrapper getCachedDataSource(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration dataSourceConfig) {
return dataSourceManager.getDataSource(dataSourceConfig);
}

/**
* Execute target table SQL.
*
* @param targetConnection target connection
* @param sql SQL
* @throws SQLException SQL exception
*/
protected void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
log.info("execute target table sql: {}", sql);
log.info("Execute target table SQL: {}", sql);
try (Statement statement = targetConnection.createStatement()) {
statement.execute(sql);
}
}

/**
* Add if not exists for create table SQL.
*
* @param createTableSQL create table SQL
* @return create table if not existed SQL
*/
protected final String addIfNotExistsForCreateTableSQL(final String createTableSQL) {
if (PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find()) {
return createTableSQL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;

import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;

import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.junit.jupiter.api.Test;

import java.sql.Connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;

import java.sql.Connection;
import java.sql.SQLException;
Expand All @@ -35,7 +35,7 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws
PipelineDataSourceManager dataSourceManager = param.getDataSourceManager();
for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine());
try (Connection targetConnection = getCachedDataSource(dataSourceManager, each.getTargetDataSourceConfig()).getConnection()) {
try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) {
executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(createTargetTableSQL));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;

import java.sql.Connection;
import java.sql.SQLException;
Expand Down Expand Up @@ -52,7 +52,7 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws
PipelineDataSourceManager dataSourceManager = param.getDataSourceManager();
for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine());
try (Connection targetConnection = getCachedDataSource(dataSourceManager, each.getTargetDataSourceConfig()).getConnection()) {
try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) {
for (String sql : Splitter.on(";").trimResults().omitEmptyStrings().splitToList(createTargetTableSQL)) {
executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(sql));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;

import java.sql.Connection;
import java.sql.SQLException;
Expand All @@ -36,7 +36,7 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws
PipelineDataSourceManager dataSourceManager = param.getDataSourceManager();
for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine());
try (Connection targetConnection = getCachedDataSource(dataSourceManager, each.getTargetDataSourceConfig()).getConnection()) {
try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) {
for (String sql : Splitter.on(";").trimResults().omitEmptyStrings().splitToList(createTargetTableSQL)) {
executeTargetTableSQL(targetConnection, sql);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;

import java.sql.Connection;
import java.sql.SQLException;
Expand All @@ -35,7 +35,7 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws
PipelineDataSourceManager dataSourceManager = param.getDataSourceManager();
for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine());
try (Connection targetConnection = getCachedDataSource(dataSourceManager, each.getTargetDataSourceConfig()).getConnection()) {
try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) {
executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(createTargetTableSQL));
}
}
Expand Down

0 comments on commit d69b3f1

Please sign in to comment.