Skip to content

Commit

Permalink
Refactor MigrationJob (#29362)
Browse files Browse the repository at this point in the history
* Refactor MigrationJob
  • Loading branch information
terrymanu authored Dec 11, 2023
1 parent bc68578 commit ab93554
Showing 1 changed file with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public final class MigrationJob extends AbstractSeparablePipelineJob<MigrationJo

private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();

// Shared by all sharding items
private final MigrationJobPreparer jobPreparer = new MigrationJobPreparer();

@Override
Expand Down Expand Up @@ -95,17 +94,20 @@ private MigrationTaskConfiguration buildTaskConfiguration(final MigrationJobConf
private Collection<CreateTableConfiguration> buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper mapper) {
Collection<CreateTableConfiguration> result = new LinkedList<>();
for (JobDataNodeEntry each : jobConfig.getTablesFirstDataNodes().getEntries()) {
String sourceSchemaName = mapper.getSchemaName(each.getLogicTableName());
String targetSchemaName = new DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData().isSchemaAvailable() ? sourceSchemaName : null;
DataNode dataNode = each.getDataNodes().get(0);
PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSources().get(dataNode.getDataSourceName());
CreateTableConfiguration createTableConfig = new CreateTableConfiguration(sourceDataSourceConfig, new CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
jobConfig.getTarget(), new CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName()));
result.add(createTableConfig);
result.add(getCreateTableConfiguration(jobConfig, mapper, each));
}
return result;
}

private CreateTableConfiguration getCreateTableConfiguration(final MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper mapper, final JobDataNodeEntry jobDataNodeEntry) {
DataNode dataNode = jobDataNodeEntry.getDataNodes().get(0);
PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSources().get(dataNode.getDataSourceName());
String sourceSchemaName = mapper.getSchemaName(jobDataNodeEntry.getLogicTableName());
String targetSchemaName = new DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData().isSchemaAvailable() ? sourceSchemaName : null;
return new CreateTableConfiguration(sourceDataSourceConfig, new CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
jobConfig.getTarget(), new CaseInsensitiveQualifiedTable(targetSchemaName, jobDataNodeEntry.getLogicTableName()));
}

private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
final Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final TableAndSchemaNameMapper mapper) {
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
Expand Down

0 comments on commit ab93554

Please sign in to comment.