Skip to content

Commit

Permalink
Refactor DumperCommonContext (#28948)
Browse files Browse the repository at this point in the history
* Refactor DumperCommonContext

* Refactor IncrementalDumperContext
  • Loading branch information
terrymanu authored Nov 5, 2023
1 parent 0d9880a commit 3f85b14
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
Expand All @@ -28,18 +29,19 @@
/**
* Dumper common context.
*/
@RequiredArgsConstructor
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
public final class DumperCommonContext {

private String dataSourceName;
private final String dataSourceName;

private PipelineDataSourceConfiguration dataSourceConfig;
private final PipelineDataSourceConfiguration dataSourceConfig;

private ActualAndLogicTableNameMapper tableNameMapper;
private final ActualAndLogicTableNameMapper tableNameMapper;

private TableAndSchemaNameMapper tableAndSchemaNameMapper;
private final TableAndSchemaNameMapper tableAndSchemaNameMapper;

private IngestPosition position;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,19 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;

/**
* Incremental dumper context.
*/
@RequiredArgsConstructor
@Getter
@Setter
@ToString
public final class IncrementalDumperContext {

private final DumperCommonContext commonContext;

private String jobId;
private final String jobId;

private boolean decodeWithTX;
private final boolean decodeWithTX;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,8 @@ public final class InventoryDumperContext {
private JobRateLimitAlgorithm rateLimitAlgorithm;

public InventoryDumperContext(final DumperCommonContext commonContext) {
this.commonContext = new DumperCommonContext();
this.commonContext.setDataSourceName(commonContext.getDataSourceName());
this.commonContext.setDataSourceConfig(commonContext.getDataSourceConfig());
this.commonContext.setTableNameMapper(commonContext.getTableNameMapper());
this.commonContext.setTableAndSchemaNameMapper(commonContext.getTableAndSchemaNameMapper());
this.commonContext = new DumperCommonContext(
commonContext.getDataSourceName(), commonContext.getDataSourceConfig(), commonContext.getTableNameMapper(), commonContext.getTableAndSchemaNameMapper());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ void setUp() throws SQLException {
}

private IncrementalDumperContext createDumperContext() {
DumperCommonContext commonContext = new DumperCommonContext();
commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root"));
commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))));
commonContext.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext);
DumperCommonContext commonContext = new DumperCommonContext(null,
new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root"),
new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext, null, false);
}

private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,11 @@ private void createTable(final String jdbcUrl, final String username, final Stri
}

private IncrementalDumperContext createDumperContext(final String jdbcUrl, final String username, final String password) {
DumperCommonContext commonContext = new DumperCommonContext();
commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))));
commonContext.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap()));
IncrementalDumperContext result = new IncrementalDumperContext(commonContext);
result.setJobId("0101123456");
return result;
DumperCommonContext commonContext = new DumperCommonContext(null,
new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password),
new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext, "0101123456", false);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ void setUp() throws SQLException {
}

private IncrementalDumperContext mockDumperContext() {
DumperCommonContext commonContext = new DumperCommonContext();
commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))));
commonContext.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext);
DumperCommonContext commonContext = new DumperCommonContext(null,
new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"),
new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext, null, false);
}

private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,19 +279,13 @@ public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguratio

private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper tableAndSchemaNameMapper) {
JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName()))));
String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
DumperCommonContext commonContext = new DumperCommonContext();
commonContext.setDataSourceName(dataSourceName);
commonContext.setDataSourceConfig(actualDataSourceConfig);
commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap));
commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
IncrementalDumperContext result = new IncrementalDumperContext(commonContext);
result.setJobId(jobConfig.getJobId());
result.setDecodeWithTX(jobConfig.isDecodeWithTX());
return result;
Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName()))));
return new IncrementalDumperContext(
new DumperCommonContext(dataSourceName, actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap), tableAndSchemaNameMapper),
jobConfig.getJobId(), jobConfig.isDecodeWithTX());
}

private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final Collection<String> schemaTableNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,16 @@

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.config.ingest.IncrementalDumperContextCreator;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;

import java.util.Map;

/**
* Migration incremental dumper configuration creator.
* Migration incremental dumper context creator.
*/
@RequiredArgsConstructor
public final class MigrationIncrementalDumperContextCreator implements IncrementalDumperContextCreator {
Expand All @@ -42,21 +37,10 @@ public final class MigrationIncrementalDumperContextCreator implements Increment

@Override
public IncrementalDumperContext createDumperContext(final JobDataNodeLine jobDataNodeLine) {
Map<ActualTableName, LogicTableName> tableNameMap = JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine);
TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
String dataSourceName = jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
return buildDumperContext(jobConfig.getJobId(), dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMap, tableAndSchemaNameMapper);
}

private IncrementalDumperContext buildDumperContext(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource,
final Map<ActualTableName, LogicTableName> tableNameMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) {
DumperCommonContext commonContext = new DumperCommonContext();
commonContext.setDataSourceName(dataSourceName);
commonContext.setDataSourceConfig(sourceDataSource);
commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap));
commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
IncrementalDumperContext result = new IncrementalDumperContext(commonContext);
result.setJobId(jobId);
return result;
ActualAndLogicTableNameMapper tableNameMapper = new ActualAndLogicTableNameMapper(JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine));
TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
return new IncrementalDumperContext(
new DumperCommonContext(dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMapper, tableAndSchemaNameMapper), jobConfig.getJobId(), false);
}
}

0 comments on commit 3f85b14

Please sign in to comment.