From c7df22d030abe18e2a58e45b3f2ab119eb00e268 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Fri, 3 Nov 2023 10:44:44 +0800 Subject: [PATCH] Refactor DumperConfiguration (#28923) --- ...tion.java => BaseDumperConfiguration.java} | 16 +++------ .../IncrementalDumperConfiguration.java | 35 +++++++++++++++++++ .../ingest/InventoryDumperConfiguration.java | 5 ++- .../dumper/IncrementalDumperCreator.java | 6 ++-- ...IncrementalDumperConfigurationCreator.java | 4 +-- .../preparer/PipelineJobPreparerUtils.java | 4 +-- .../dumper/H2IncrementalDumperCreator.java | 4 +-- .../mysql/ingest/MySQLIncrementalDumper.java | 6 ++-- .../dumper/MySQLIncrementalDumperCreator.java | 6 ++-- .../ingest/MySQLIncrementalDumperTest.java | 10 +++--- .../opengauss/ingest/OpenGaussWALDumper.java | 6 ++-- .../OpenGaussIncrementalDumperCreator.java | 6 ++-- .../ingest/PostgreSQLWALDumper.java | 6 ++-- .../PostgreSQLIncrementalDumperCreator.java | 6 ++-- .../ingest/wal/WALEventConverter.java | 6 ++-- .../ingest/PostgreSQLWALDumperTest.java | 8 ++--- .../ingest/wal/WALEventConverterTest.java | 10 +++--- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 16 ++++----- .../cdc/config/task/CDCTaskConfiguration.java | 4 +-- .../cdc/core/prepare/CDCJobPreparer.java | 4 +-- .../migration/api/impl/MigrationJobAPI.java | 11 +++--- .../config/MigrationTaskConfiguration.java | 4 +-- ...IncrementalDumperConfigurationCreator.java | 10 +++--- .../prepare/MigrationJobPreparer.java | 4 +-- .../FixtureIncrementalDumperCreator.java | 4 +-- .../prepare/InventoryTaskSplitterTest.java | 14 ++++---- .../pipeline/core/task/InventoryTaskTest.java | 4 +-- 27 files changed, 123 insertions(+), 96 deletions(-) rename kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/{DumperConfiguration.java => BaseDumperConfiguration.java} (93%) create mode 100644 kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/IncrementalDumperConfiguration.java diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/BaseDumperConfiguration.java similarity index 93% rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/BaseDumperConfiguration.java index 4d8c81f83a874..59b9ebd41fa21 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/BaseDumperConfiguration.java @@ -17,7 +17,6 @@ package org.apache.shardingsphere.data.pipeline.api.config.ingest; -import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -35,32 +34,25 @@ import java.util.stream.Collectors; /** - * Dumper configuration. + * Base dumper configuration. */ @Getter @Setter @ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"}) -// TODO it should be final and not extends by sub-class -// TODO fields final -public class DumperConfiguration { - - private String jobId; +public abstract class BaseDumperConfiguration { private String dataSourceName; private PipelineDataSourceConfiguration dataSourceConfig; - private IngestPosition position; - private Map tableNameMap; private TableNameSchemaNameMapping tableNameSchemaNameMapping; // LinkedHashSet is required - @Getter(AccessLevel.PROTECTED) private Map> targetTableColumnsMap = new HashMap<>(); - private boolean decodeWithTX; + private IngestPosition position; /** * Get logic table name. @@ -100,7 +92,7 @@ public String getSchemaName(final LogicTableName logicTableName) { * Get schema name. * * @param actualTableName actual table name - * @return schema name. nullable + * @return schema name, can be nullable */ public String getSchemaName(final ActualTableName actualTableName) { return tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName)); diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/IncrementalDumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/IncrementalDumperConfiguration.java new file mode 100644 index 0000000000000..326e25c9262b6 --- /dev/null +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/IncrementalDumperConfiguration.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.api.config.ingest; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +/** + * Incremental dumper configuration. + */ +@Getter +@Setter +@ToString(callSuper = true) +public class IncrementalDumperConfiguration extends BaseDumperConfiguration { + + private String jobId; + + private boolean decodeWithTX; +} diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java index 6663e20af4a5e..792f60c8f3700 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java @@ -31,8 +31,7 @@ @Getter @Setter @ToString(callSuper = true) -// TODO fields final -public final class InventoryDumperConfiguration extends DumperConfiguration { +public final class InventoryDumperConfiguration extends BaseDumperConfiguration { private String actualTableName; @@ -52,7 +51,7 @@ public final class InventoryDumperConfiguration extends DumperConfiguration { private JobRateLimitAlgorithm rateLimitAlgorithm; - public InventoryDumperConfiguration(final DumperConfiguration dumperConfig) { + public InventoryDumperConfiguration(final BaseDumperConfiguration dumperConfig) { setDataSourceName(dumperConfig.getDataSourceName()); setDataSourceConfig(dumperConfig.getDataSourceConfig()); setTableNameMap(dumperConfig.getTableNameMap()); diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java index 615a006ef9264..30705ecf7cbe1 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -34,13 +34,13 @@ public interface IncrementalDumperCreator extends DatabaseTypedSPI { /** * Create incremental dumper. * - * @param dumperConfig dumper configuration + * @param config incremental dumper configuration * @param position position * @param channel channel * @param metaDataLoader meta data loader * @return incremental dumper */ - IncrementalDumper createIncrementalDumper(DumperConfiguration dumperConfig, IngestPosition position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader); + IncrementalDumper createIncrementalDumper(IncrementalDumperConfiguration config, IngestPosition position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader); /** * Whether support incremental dump. diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java index 2934a8f142f34..f95815578dcf2 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.common.config.ingest; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; /** @@ -31,5 +31,5 @@ public interface IncrementalDumperConfigurationCreator { * @param jobDataNodeLine job data node line * @return dumper configuration */ - DumperConfiguration createDumperConfiguration(JobDataNodeLine jobDataNodeLine); + IncrementalDumperConfiguration createDumperConfiguration(JobDataNodeLine jobDataNodeLine); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java index 02f46a3992859..0cd8d13e3af11 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java @@ -20,7 +20,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; @@ -123,7 +123,7 @@ public static void prepareTargetTables(final DatabaseType databaseType, final Pr * @return ingest position * @throws SQLException sql exception */ - public static IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final DumperConfiguration dumperConfig, + public static IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperConfiguration dumperConfig, final PipelineDataSourceManager dataSourceManager) throws SQLException { if (null != initIncremental) { Optional position = initIncremental.getIncrementalPosition(); diff --git a/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/ingest/dumper/H2IncrementalDumperCreator.java b/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/ingest/dumper/H2IncrementalDumperCreator.java index 879f0d7732e8c..f0fe498d04be7 100644 --- a/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/ingest/dumper/H2IncrementalDumperCreator.java +++ b/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/ingest/dumper/H2IncrementalDumperCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.h2.ingest.dumper; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -30,7 +30,7 @@ public final class H2IncrementalDumperCreator implements IncrementalDumperCreator { @Override - public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position, + public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { throw new UnsupportedOperationException("H2 database can not support incremental dump."); } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index c41f2ae9e3314..8b65fa2252950 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -19,7 +19,7 @@ import com.google.common.base.Preconditions; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; @@ -68,7 +68,7 @@ @Slf4j public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper { - private final DumperConfiguration dumperConfig; + private final IncrementalDumperConfiguration dumperConfig; private final BinlogPosition binlogPosition; @@ -80,7 +80,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl private final String catalog; - public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition binlogPosition, + public MySQLIncrementalDumper(final IncrementalDumperConfiguration dumperConfig, final IngestPosition binlogPosition, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration"); this.dumperConfig = dumperConfig; diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java index 1e0f4a294b3c6..e359cd5b8baef 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.dumper; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -31,9 +31,9 @@ public final class MySQLIncrementalDumperCreator implements IncrementalDumperCreator { @Override - public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position, + public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - return new MySQLIncrementalDumper(dumperConfig, position, channel, metaDataLoader); + return new MySQLIncrementalDumper(config, position, channel, metaDataLoader); } @Override diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index dd956b53316e1..5cc6e395f374e 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -19,7 +19,7 @@ import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; @@ -75,7 +75,7 @@ @SuppressWarnings("unchecked") class MySQLIncrementalDumperTest { - private DumperConfiguration dumperConfig; + private IncrementalDumperConfiguration dumperConfig; private MySQLIncrementalDumper incrementalDumper; @@ -93,8 +93,8 @@ void setUp() { when(metaDataLoader.getTableMetaData(any(), any())).thenReturn(pipelineTableMetaData); } - private DumperConfiguration mockDumperConfiguration() { - DumperConfiguration result = new DumperConfiguration(); + private IncrementalDumperConfiguration mockDumperConfiguration() { + IncrementalDumperConfiguration result = new IncrementalDumperConfiguration(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root")); result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); @@ -103,7 +103,7 @@ private DumperConfiguration mockDumperConfiguration() { } @SneakyThrows(SQLException.class) - private void initTableData(final DumperConfiguration dumperConfig) { + private void initTableData(final IncrementalDumperConfiguration dumperConfig) { try ( PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java index 4fa6c8539f792..c4b4defb39059 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java @@ -19,7 +19,7 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; @@ -58,7 +58,7 @@ @Slf4j public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper { - private final DumperConfiguration dumperConfig; + private final IncrementalDumperConfiguration dumperConfig; private final AtomicReference walPosition; @@ -72,7 +72,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen private List rowEvents = new LinkedList<>(); - public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final IngestPosition position, + public OpenGaussWALDumper(final IncrementalDumperConfiguration dumperConfig, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration")); diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java index 0d11f2a6dc64d..b4c9bde4d41bd 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.ingest.dumper; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -31,9 +31,9 @@ public final class OpenGaussIncrementalDumperCreator implements IncrementalDumperCreator { @Override - public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position, + public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - return new OpenGaussWALDumper(dumperConfig, position, channel, metaDataLoader); + return new OpenGaussWALDumper(config, position, channel, metaDataLoader); } @Override diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java index c281fdc8a716d..f377684cd2e87 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java @@ -19,7 +19,7 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; @@ -60,7 +60,7 @@ @Slf4j public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper { - private final DumperConfiguration dumperConfig; + private final IncrementalDumperConfiguration dumperConfig; private final AtomicReference walPosition; @@ -74,7 +74,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme private List rowEvents = new LinkedList<>(); - public PostgreSQLWALDumper(final DumperConfiguration dumperConfig, final IngestPosition position, + public PostgreSQLWALDumper(final IncrementalDumperConfiguration dumperConfig, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration")); diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java index 6ef5ec6250e0a..50852c68caa3e 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.dumper; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -31,9 +31,9 @@ public final class PostgreSQLIncrementalDumperCreator implements IncrementalDumperCreator { @Override - public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position, + public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - return new PostgreSQLWALDumper(dumperConfig, position, channel, metaDataLoader); + return new PostgreSQLWALDumper(config, position, channel, metaDataLoader); } @Override diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java index 5703ed68ce018..388c664780a13 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; @@ -43,11 +43,11 @@ */ public final class WALEventConverter { - private final DumperConfiguration dumperConfig; + private final IncrementalDumperConfiguration dumperConfig; private final PipelineTableMetaDataLoader metaDataLoader; - public WALEventConverter(final DumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) { + public WALEventConverter(final IncrementalDumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) { this.dumperConfig = dumperConfig; this.metaDataLoader = metaDataLoader; } diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index b01ef0ad6229c..5167148ddc26f 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; @@ -72,7 +72,7 @@ class PostgreSQLWALDumperTest { private WALPosition position; - private DumperConfiguration dumperConfig; + private IncrementalDumperConfiguration dumperConfig; private PostgreSQLWALDumper walDumper; @@ -103,8 +103,8 @@ private void createTable(final String jdbcUrl, final String username, final Stri } } - private DumperConfiguration createDumperConfiguration(final String jdbcUrl, final String username, final String password) { - DumperConfiguration result = new DumperConfiguration(); + private IncrementalDumperConfiguration createDumperConfiguration(final String jdbcUrl, final String username, final String password) { + IncrementalDumperConfiguration result = new IncrementalDumperConfiguration(); result.setJobId("0101123456"); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password)); result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))); diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 85850fa1daebf..29479237999a4 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -19,7 +19,7 @@ import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; @@ -71,7 +71,7 @@ class WALEventConverterTest { - private DumperConfiguration dumperConfig; + private IncrementalDumperConfiguration dumperConfig; private WALEventConverter walEventConverter; @@ -88,8 +88,8 @@ void setUp() { pipelineTableMetaData = new PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(), Collections.emptyList()); } - private DumperConfiguration mockDumperConfiguration() { - DumperConfiguration result = new DumperConfiguration(); + private IncrementalDumperConfiguration mockDumperConfiguration() { + IncrementalDumperConfiguration result = new IncrementalDumperConfiguration(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); @@ -97,7 +97,7 @@ private DumperConfiguration mockDumperConfiguration() { } @SneakyThrows(SQLException.class) - private void initTableData(final DumperConfiguration dumperConfig) { + private void initTableData(final IncrementalDumperConfiguration dumperConfig) { try ( PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index ac88431241713..f88832aea96f1 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; @@ -177,7 +177,7 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) { if (getJobItemProgress(jobId, i).isPresent()) { continue; } - DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, i, getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames())); + IncrementalDumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, i, getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames())); InventoryIncrementalJobItemProgress jobItemProgress = getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, dumperConfig); PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress( jobId, i, YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); @@ -189,11 +189,11 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) { private static InventoryIncrementalJobItemProgress getInventoryIncrementalJobItemProgress(final CDCJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager, - final DumperConfiguration dumperConfig) throws SQLException { + final IncrementalDumperConfiguration incrementalDumperConfig) throws SQLException { InventoryIncrementalJobItemProgress result = new InventoryIncrementalJobItemProgress(); result.setSourceDatabaseType(jobConfig.getSourceDatabaseType()); - result.setDataSourceName(dumperConfig.getDataSourceName()); - IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, dumperConfig, dataSourceManager)); + result.setDataSourceName(incrementalDumperConfig.getDataSourceName()); + IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, incrementalDumperConfig, dataSourceManager)); result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress)); return result; } @@ -268,7 +268,7 @@ protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) { public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig; TableNameSchemaNameMapping tableNameSchemaNameMapping = getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames()); - DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, jobShardingItem, tableNameSchemaNameMapping); + IncrementalDumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, jobShardingItem, tableNameSchemaNameMapping); ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, jobConfig.getSchemaTableNames(), tableNameSchemaNameMapping); CDCTaskConfiguration result = new CDCTaskConfiguration(dumperConfig, importerConfig); log.debug("buildTaskConfiguration, result={}", result); @@ -286,13 +286,13 @@ private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final Collectio return new TableNameSchemaNameMapping(tableNameSchemaMap); } - private DumperConfiguration buildDumperConfiguration(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { + private IncrementalDumperConfiguration buildDumperConfiguration(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem); Map tableNameMap = new LinkedHashMap<>(); dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName())))); String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName(); StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName); - DumperConfiguration result = new DumperConfiguration(); + IncrementalDumperConfiguration result = new IncrementalDumperConfiguration(); result.setJobId(jobConfig.getJobId()); result.setDataSourceName(dataSourceName); result.setDataSourceConfig(actualDataSourceConfig); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/task/CDCTaskConfiguration.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/task/CDCTaskConfiguration.java index ae4cac23dea6e..af862bc1a4b8c 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/task/CDCTaskConfiguration.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/task/CDCTaskConfiguration.java @@ -19,7 +19,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.PipelineTaskConfiguration; @@ -30,7 +30,7 @@ @Getter public final class CDCTaskConfiguration implements PipelineTaskConfiguration { - private final DumperConfiguration dumperConfig; + private final IncrementalDumperConfiguration dumperConfig; private final ImporterConfiguration importerConfig; } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index f91a67dc1dff0..099c71139cc0e 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.prepare; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; @@ -144,7 +144,7 @@ private boolean hasGlobalCSN(final DatabaseType databaseType) { private void initIncrementalTask(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List channelProgressPairs) { CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); - DumperConfiguration dumperConfig = taskConfig.getDumperConfig(); + IncrementalDumperConfiguration dumperConfig = taskConfig.getDumperConfig(); ImporterConfiguration importerConfig = taskConfig.getImporterConfig(); IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(), jobItemContext.getInitProgress()); PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getPipelineChannelCreator(), taskProgress); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 1d4a13fc9f316..b7a11dae97623 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -20,7 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; @@ -262,13 +262,14 @@ protected MigrationJobConfiguration getJobConfiguration(final JobConfigurationPO @Override public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig; - DumperConfiguration dumperConfig = new MigrationIncrementalDumperConfigurationCreator(jobConfig).createDumperConfiguration(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); - CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig, dumperConfig.getTableNameSchemaNameMapping()); + IncrementalDumperConfiguration incrementalDumperConfig = new MigrationIncrementalDumperConfigurationCreator( + jobConfig).createDumperConfiguration(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); + CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig, incrementalDumperConfig.getTableNameSchemaNameMapping()); Set targetTableNames = jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet()); Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); - ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, shardingColumnsMap, dumperConfig.getTableNameSchemaNameMapping()); - MigrationTaskConfiguration result = new MigrationTaskConfiguration(dumperConfig.getDataSourceName(), createTableConfig, dumperConfig, importerConfig); + ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperConfig.getTableNameSchemaNameMapping()); + MigrationTaskConfiguration result = new MigrationTaskConfiguration(incrementalDumperConfig.getDataSourceName(), createTableConfig, incrementalDumperConfig, importerConfig); log.info("buildTaskConfiguration, result={}", result); return result; } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java index edc9dbf2e85da..f250fafe4a27d 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.PipelineTaskConfiguration; @@ -37,7 +37,7 @@ public final class MigrationTaskConfiguration implements PipelineTaskConfigurati private final CreateTableConfiguration createTableConfig; - private final DumperConfiguration dumperConfig; + private final IncrementalDumperConfiguration dumperConfig; private final ImporterConfiguration importerConfig; } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java index 4763f25aa040a..07cb0dfd60dbf 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java @@ -19,7 +19,7 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; @@ -39,16 +39,16 @@ public final class MigrationIncrementalDumperConfigurationCreator implements Inc private final MigrationJobConfiguration jobConfig; @Override - public DumperConfiguration createDumperConfiguration(final JobDataNodeLine jobDataNodeLine) { + public IncrementalDumperConfiguration createDumperConfiguration(final JobDataNodeLine jobDataNodeLine) { Map tableNameMap = JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine); TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap()); String dataSourceName = jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName(); return buildDumperConfiguration(jobConfig.getJobId(), dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMap, tableNameSchemaNameMapping); } - private DumperConfiguration buildDumperConfiguration(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource, - final Map tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { - DumperConfiguration result = new DumperConfiguration(); + private IncrementalDumperConfiguration buildDumperConfiguration(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource, + final Map tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { + IncrementalDumperConfiguration result = new IncrementalDumperConfiguration(); result.setJobId(jobId); result.setDataSourceName(dataSourceName); result.setDataSourceConfig(sourceDataSource); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java index 63c3858908c0f..ca5ab63968aee 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.prepare; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; @@ -185,7 +185,7 @@ private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator(); PipelineTableMetaDataLoader sourceMetaDataLoader = jobItemContext.getSourceMetaDataLoader(); - DumperConfiguration dumperConfig = taskConfig.getDumperConfig(); + IncrementalDumperConfiguration dumperConfig = taskConfig.getDumperConfig(); ImporterConfiguration importerConfig = taskConfig.getImporterConfig(); ExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(); IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(), jobItemContext.getInitProgress()); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java index 8abb8b49287ec..297b49a4881a1 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.fixture; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -30,7 +30,7 @@ public final class FixtureIncrementalDumperCreator implements IncrementalDumperCreator { @Override - public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position, + public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { return new FixtureIncrementalDumper(); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java index 5212fcfa6d2d1..d029ac73f49d5 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.prepare; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.BaseDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; @@ -143,7 +143,7 @@ void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws SQLException } } - private void initEmptyTablePrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException { + private void initEmptyTablePrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException { DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); try ( Connection connection = dataSource.getConnection(); @@ -153,7 +153,7 @@ private void initEmptyTablePrimaryEnvironment(final DumperConfiguration dumperCo } } - private void initIntPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException { + private void initIntPrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException { DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); try ( Connection connection = dataSource.getConnection(); @@ -166,7 +166,7 @@ private void initIntPrimaryEnvironment(final DumperConfiguration dumperConfig) t } } - private void initCharPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException { + private void initCharPrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException { DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); try ( Connection connection = dataSource.getConnection(); @@ -177,7 +177,7 @@ private void initCharPrimaryEnvironment(final DumperConfiguration dumperConfig) } } - private void initUnionPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException { + private void initUnionPrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException { DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); try ( Connection connection = dataSource.getConnection(); @@ -188,7 +188,7 @@ private void initUnionPrimaryEnvironment(final DumperConfiguration dumperConfig) } } - private void initNoPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException { + private void initNoPrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException { DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); try ( Connection connection = dataSource.getConnection(); @@ -199,7 +199,7 @@ private void initNoPrimaryEnvironment(final DumperConfiguration dumperConfig) th } } - private void initUniqueIndexOnNotNullColumnEnvironment(final DumperConfiguration dumperConfig) throws SQLException { + private void initUniqueIndexOnNotNullColumnEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException { DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); try ( Connection connection = dataSource.getConnection(); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java index f56ea61452b42..82f09ad5cf8f1 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.task; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -84,7 +84,7 @@ void assertGetProgress() throws SQLException, ExecutionException, InterruptedExc inventoryTask.close(); } - private void initTableData(final DumperConfiguration dumperConfig) throws SQLException { + private void initTableData(final IncrementalDumperConfiguration dumperConfig) throws SQLException { PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); try ( PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());