Skip to content

Commit

Permalink
Refactor DumperConfiguration (#28923)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 3, 2023
1 parent 58b2a2a commit c7df22d
Show file tree
Hide file tree
Showing 27 changed files with 123 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.data.pipeline.api.config.ingest;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
Expand All @@ -35,32 +34,25 @@
import java.util.stream.Collectors;

/**
* Dumper configuration.
* Base dumper configuration.
*/
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
// TODO it should be final and not extends by sub-class
// TODO fields final
public class DumperConfiguration {

private String jobId;
public abstract class BaseDumperConfiguration {

private String dataSourceName;

private PipelineDataSourceConfiguration dataSourceConfig;

private IngestPosition position;

private Map<ActualTableName, LogicTableName> tableNameMap;

private TableNameSchemaNameMapping tableNameSchemaNameMapping;

// LinkedHashSet is required
@Getter(AccessLevel.PROTECTED)
private Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap = new HashMap<>();

private boolean decodeWithTX;
private IngestPosition position;

/**
* Get logic table name.
Expand Down Expand Up @@ -100,7 +92,7 @@ public String getSchemaName(final LogicTableName logicTableName) {
* Get schema name.
*
* @param actualTableName actual table name
* @return schema name. nullable
* @return schema name, can be nullable
*/
public String getSchemaName(final ActualTableName actualTableName) {
return tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.api.config.ingest;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

/**
* Incremental dumper configuration.
*/
@Getter
@Setter
@ToString(callSuper = true)
public class IncrementalDumperConfiguration extends BaseDumperConfiguration {

private String jobId;

private boolean decodeWithTX;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
@Getter
@Setter
@ToString(callSuper = true)
// TODO fields final
public final class InventoryDumperConfiguration extends DumperConfiguration {
public final class InventoryDumperConfiguration extends BaseDumperConfiguration {

private String actualTableName;

Expand All @@ -52,7 +51,7 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {

private JobRateLimitAlgorithm rateLimitAlgorithm;

public InventoryDumperConfiguration(final DumperConfiguration dumperConfig) {
public InventoryDumperConfiguration(final BaseDumperConfiguration dumperConfig) {
setDataSourceName(dumperConfig.getDataSourceName());
setDataSourceConfig(dumperConfig.getDataSourceConfig());
setTableNameMap(dumperConfig.getTableNameMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand All @@ -34,13 +34,13 @@ public interface IncrementalDumperCreator extends DatabaseTypedSPI {
/**
* Create incremental dumper.
*
* @param dumperConfig dumper configuration
* @param config incremental dumper configuration
* @param position position
* @param channel channel
* @param metaDataLoader meta data loader
* @return incremental dumper
*/
IncrementalDumper createIncrementalDumper(DumperConfiguration dumperConfig, IngestPosition position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader);
IncrementalDumper createIncrementalDumper(IncrementalDumperConfiguration config, IngestPosition position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader);

/**
* Whether support incremental dump.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.common.config.ingest;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;

/**
Expand All @@ -31,5 +31,5 @@ public interface IncrementalDumperConfigurationCreator {
* @param jobDataNodeLine job data node line
* @return dumper configuration
*/
DumperConfiguration createDumperConfiguration(JobDataNodeLine jobDataNodeLine);
IncrementalDumperConfiguration createDumperConfiguration(JobDataNodeLine jobDataNodeLine);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
Expand Down Expand Up @@ -123,7 +123,7 @@ public static void prepareTargetTables(final DatabaseType databaseType, final Pr
* @return ingest position
* @throws SQLException sql exception
*/
public static IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final DumperConfiguration dumperConfig,
public static IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperConfiguration dumperConfig,
final PipelineDataSourceManager dataSourceManager) throws SQLException {
if (null != initIncremental) {
Optional<IngestPosition> position = initIncremental.getIncrementalPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.h2.ingest.dumper;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand All @@ -30,7 +30,7 @@
public final class H2IncrementalDumperCreator implements IncrementalDumperCreator {

@Override
public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
throw new UnsupportedOperationException("H2 database can not support incremental dump.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
Expand Down Expand Up @@ -68,7 +68,7 @@
@Slf4j
public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {

private final DumperConfiguration dumperConfig;
private final IncrementalDumperConfiguration dumperConfig;

private final BinlogPosition binlogPosition;

Expand All @@ -80,7 +80,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl

private final String catalog;

public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition binlogPosition,
public MySQLIncrementalDumper(final IncrementalDumperConfiguration dumperConfig, final IngestPosition binlogPosition,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
this.dumperConfig = dumperConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.mysql.ingest.dumper;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand All @@ -31,9 +31,9 @@
public final class MySQLIncrementalDumperCreator implements IncrementalDumperCreator {

@Override
public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
return new MySQLIncrementalDumper(dumperConfig, position, channel, metaDataLoader);
return new MySQLIncrementalDumper(config, position, channel, metaDataLoader);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
Expand Down Expand Up @@ -75,7 +75,7 @@
@SuppressWarnings("unchecked")
class MySQLIncrementalDumperTest {

private DumperConfiguration dumperConfig;
private IncrementalDumperConfiguration dumperConfig;

private MySQLIncrementalDumper incrementalDumper;

Expand All @@ -93,8 +93,8 @@ void setUp() {
when(metaDataLoader.getTableMetaData(any(), any())).thenReturn(pipelineTableMetaData);
}

private DumperConfiguration mockDumperConfiguration() {
DumperConfiguration result = new DumperConfiguration();
private IncrementalDumperConfiguration mockDumperConfiguration() {
IncrementalDumperConfiguration result = new IncrementalDumperConfiguration();
result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root"));
result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap()));
Expand All @@ -103,7 +103,7 @@ private DumperConfiguration mockDumperConfiguration() {
}

@SneakyThrows(SQLException.class)
private void initTableData(final DumperConfiguration dumperConfig) {
private void initTableData(final IncrementalDumperConfiguration dumperConfig) {
try (
PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
Expand Down Expand Up @@ -58,7 +58,7 @@
@Slf4j
public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper {

private final DumperConfiguration dumperConfig;
private final IncrementalDumperConfiguration dumperConfig;

private final AtomicReference<WALPosition> walPosition;

Expand All @@ -72,7 +72,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen

private List<AbstractRowEvent> rowEvents = new LinkedList<>();

public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
public OpenGaussWALDumper(final IncrementalDumperConfiguration dumperConfig, final IngestPosition position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.opengauss.ingest.dumper;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand All @@ -31,9 +31,9 @@
public final class OpenGaussIncrementalDumperCreator implements IncrementalDumperCreator {

@Override
public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
return new OpenGaussWALDumper(dumperConfig, position, channel, metaDataLoader);
return new OpenGaussWALDumper(config, position, channel, metaDataLoader);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
Expand Down Expand Up @@ -60,7 +60,7 @@
@Slf4j
public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper {

private final DumperConfiguration dumperConfig;
private final IncrementalDumperConfiguration dumperConfig;

private final AtomicReference<WALPosition> walPosition;

Expand All @@ -74,7 +74,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme

private List<AbstractRowEvent> rowEvents = new LinkedList<>();

public PostgreSQLWALDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
public PostgreSQLWALDumper(final IncrementalDumperConfiguration dumperConfig, final IngestPosition position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.postgresql.ingest.dumper;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand All @@ -31,9 +31,9 @@
public final class PostgreSQLIncrementalDumperCreator implements IncrementalDumperCreator {

@Override
public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
return new PostgreSQLWALDumper(dumperConfig, position, channel, metaDataLoader);
return new PostgreSQLWALDumper(config, position, channel, metaDataLoader);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
Expand All @@ -43,11 +43,11 @@
*/
public final class WALEventConverter {

private final DumperConfiguration dumperConfig;
private final IncrementalDumperConfiguration dumperConfig;

private final PipelineTableMetaDataLoader metaDataLoader;

public WALEventConverter(final DumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) {
public WALEventConverter(final IncrementalDumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) {
this.dumperConfig = dumperConfig;
this.metaDataLoader = metaDataLoader;
}
Expand Down
Loading

0 comments on commit c7df22d

Please sign in to comment.