diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index 5ecfd94da..b78eea533 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -120,6 +120,10 @@ public class SnowflakeSinkConnectorConfig { public static final boolean SNOWPIPE_FILE_CLEANER_FIX_ENABLED_DEFAULT = true; public static final int SNOWPIPE_FILE_CLEANER_THREADS_DEFAULT = 1; + public static final String SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED = + "snowflake.snowpipe.stageFileNameExtensionEnabled"; + public static final boolean SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED_DEFAULT = true; + // Whether to close streaming channels in parallel. public static final String SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL = "snowflake.streaming.closeChannelsInParallel.enabled"; diff --git a/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java b/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java index a4314cd49..aeb003d12 100644 --- a/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java +++ b/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java @@ -349,6 +349,16 @@ public static ConfigDef getConfig() { ConfigDef.Importance.LOW, "Defines number of worker threads to associate with the cleaner task. By default there" + " is one cleaner per topic's partition and they all share one worker thread") + .define( + SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED, + ConfigDef.Type.BOOLEAN, + SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED_DEFAULT, + ConfigDef.Importance.LOW, + "Defines whether stage file names should be prefixed with source topic's name hash." + + " This is required in scenarios, when there are multiple topics configured to" + + " ingest data into a single table via topic2table map. If disabled, there is a" + + " risk that files from various topics may collide with each other and be deleted" + + " before ingestion.") .define( SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL, ConfigDef.Type.BOOLEAN, diff --git a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java index 76158abe7..7711698d6 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java @@ -106,7 +106,8 @@ static String filePrefix(String appName, String table, String topic, int partiti // 3. add 0x8000 (light up 15th bit as a marker) // 4. add partition id (which should in production use cases never reach a value above 5.000 // partitions pers topic). - // In theory - we would support 32767 partitions, which is more than + // In theory - we would support 32767 partitions, which is more than any reasonable value for + // a single topic byte[] bytes = topic.toUpperCase().getBytes(StandardCharsets.UTF_8); BigInteger hash = BigInteger.valueOf(Crc32C.compute(bytes, 0, bytes.length)); partitionPart = diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceFactory.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceFactory.java index ec07a0656..208059986 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceFactory.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceFactory.java @@ -72,6 +72,19 @@ private SnowflakeSinkServiceBuilder( if (useStageFilesProcessor) { svc.enableStageFilesProcessor(threadCount); } + + boolean extendedStageFileNameFix = + SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED_DEFAULT; + if (connectorConfig != null + && connectorConfig.containsKey( + SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED)) { + extendedStageFileNameFix = + Boolean.parseBoolean( + connectorConfig.get( + SnowflakeSinkConnectorConfig + .SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED)); + } + svc.configureSingleTableLoadFromMultipleTopcis(extendedStageFileNameFix); } else { this.service = new SnowflakeSinkServiceV2(conn, connectorConfig); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index ce93c0928..fd4a16ce5 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -9,6 +9,7 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.config.TopicToTableModeExtractor; @@ -27,12 +28,15 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; @@ -88,6 +92,13 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService { private boolean useStageFilesProcessor = false; @Nullable private ScheduledExecutorService cleanerServiceExecutor; + // if enabled, the prefix for stage files for a given table will contain information about source + // topic hashcode. This is required in scenarios when multiple topics are configured to ingest + // data into a single table. + private boolean enableStageFilePrefixExtension = false; + + private final Set perTableWarningNotifications = new HashSet<>(); + SnowflakeSinkServiceV1(SnowflakeConnectionService conn) { if (conn == null || conn.isClosed()) { throw SnowflakeErrors.ERROR_5010.getException(); @@ -135,6 +146,28 @@ public void startPartition(final String tableName, final TopicPartition topicPar conn, topicPartition.partition(), cleanerServiceExecutor)); + + if (enableStageFilePrefixExtension + && TopicToTableModeExtractor.determineTopic2TableMode( + topic2TableMap, topicPartition.topic()) + == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE) { + // if snowflake.snowpipe.stageFileNameExtensionEnabled is enabled and table is used by + // multiple topics, we may end up in a situation, when data from different topics may have + // ended up in the same bucket - after enabling this fix, that data will stay on stage + // forever - we want to give user information about such situation and we will list all + // files, which wouldn't be processed by connector anymore. + String key = String.format("%s-%d", tableName, topicPartition.partition()); + synchronized (perTableWarningNotifications) { + if (!perTableWarningNotifications.contains(key)) { + perTableWarningNotifications.add(key); + ForkJoinPool.commonPool() + .submit( + () -> + checkTableStageForObsoleteFiles( + stageName, tableName, topicPartition.partition())); + } + } + } } } @@ -373,6 +406,35 @@ protected static String getNameIndex(String topic, int partition) { return topic + "_" + partition; } + public void configureSingleTableLoadFromMultipleTopcis(boolean fixEnabled) { + enableStageFilePrefixExtension = fixEnabled; + } + + /** + * util method, checks if there are stage files present matching "old" file name format, if they + * are - lists them and asks user to manually delete them. + */ + private void checkTableStageForObsoleteFiles(String stageName, String tableName, int partition) { + try { + String prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, null, partition); + List stageFiles = conn.listStage(stageName, prefix); + if (!stageFiles.isEmpty()) { + LOGGER.warn( + "NOTE: For table {} there are {} files matching {} prefix.", + tableName, + stageFiles.size(), + prefix); + stageFiles.sort(String::compareToIgnoreCase); + LOGGER.warn("Please consider manually deleting these files:"); + for (List names : Lists.partition(stageFiles, 10)) { + LOGGER.warn(String.join(", ", names)); + } + } + } catch (Exception err) { + LOGGER.warn("could not query stage - {}<{}>", err.getMessage(), err.getClass().getName()); + } + } + private class ServiceContext { private final String tableName; private final String stageName; @@ -437,8 +499,20 @@ private ServiceContext( // SNOW-1642799 = if multiple topics load data into single table, we need to ensure prefix is // unique per table - otherwise, file cleaners for different channels may run into race // condition - if (determineTopic2TableMode(topic2TableMap, topicName) - == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE) { + TopicToTableModeExtractor.Topic2TableMode mode = + determineTopic2TableMode(topic2TableMap, topicName); + if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE + && !enableStageFilePrefixExtension) { + LOGGER.warn( + "The table {} is used as ingestion target by multiple topics - including this one" + + " '{}'.\n" + + "To prevent potential data loss consider setting" + + " 'snowflake.snowpipe.stageFileNameExtensionEnabled' to true", + topicName, + tableName); + } + if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE + && enableStageFilePrefixExtension) { this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition); } else {