Skip to content

Commit

Permalink
updates after review
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-gjachimko committed Oct 1, 2024
1 parent 74032d2 commit 7897e9f
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ public class SnowflakeSinkConnectorConfig {
public static final boolean SNOWPIPE_FILE_CLEANER_FIX_ENABLED_DEFAULT = true;
public static final int SNOWPIPE_FILE_CLEANER_THREADS_DEFAULT = 1;

public static final String SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED =
"snowflake.snowpipe.stageFileNameExtensionEnabled";
public static final boolean SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED_DEFAULT = true;

// Whether to close streaming channels in parallel.
public static final String SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL =
"snowflake.streaming.closeChannelsInParallel.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,16 @@ public static ConfigDef getConfig() {
ConfigDef.Importance.LOW,
"Defines number of worker threads to associate with the cleaner task. By default there"
+ " is one cleaner per topic's partition and they all share one worker thread")
.define(
SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED,
ConfigDef.Type.BOOLEAN,
SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
"Defines whether stage file names should be prefixed with source topic's name hash."
+ " This is required in scenarios, when there are multiple topics configured to"
+ " ingest data into a single table via topic2table map. If disabled, there is a"
+ " risk that files from various topics may collide with each other and be deleted"
+ " before ingestion.")
.define(
SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL,
ConfigDef.Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ static String filePrefix(String appName, String table, String topic, int partiti
// 3. add 0x8000 (light up 15th bit as a marker)
// 4. add partition id (which should in production use cases never reach a value above 5.000
// partitions pers topic).
// In theory - we would support 32767 partitions, which is more than
// In theory - we would support 32767 partitions, which is more than any reasonable value for
// a single topic
byte[] bytes = topic.toUpperCase().getBytes(StandardCharsets.UTF_8);
BigInteger hash = BigInteger.valueOf(Crc32C.compute(bytes, 0, bytes.length));
partitionPart =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ private SnowflakeSinkServiceBuilder(
if (useStageFilesProcessor) {
svc.enableStageFilesProcessor(threadCount);
}

boolean extendedStageFileNameFix =
SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED_DEFAULT;
if (connectorConfig != null
&& connectorConfig.containsKey(
SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED)) {
extendedStageFileNameFix =
Boolean.parseBoolean(
connectorConfig.get(
SnowflakeSinkConnectorConfig
.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED));
}
svc.configureSingleTableLoadFromMultipleTopcis(extendedStageFileNameFix);
} else {
this.service = new SnowflakeSinkServiceV2(conn, connectorConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.config.TopicToTableModeExtractor;
Expand All @@ -27,12 +28,15 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -88,6 +92,13 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService {
private boolean useStageFilesProcessor = false;
@Nullable private ScheduledExecutorService cleanerServiceExecutor;

// if enabled, the prefix for stage files for a given table will contain information about source
// topic hashcode. This is required in scenarios when multiple topics are configured to ingest
// data into a single table.
private boolean enableStageFilePrefixExtension = false;

private final Set<String> perTableWarningNotifications = new HashSet<>();

SnowflakeSinkServiceV1(SnowflakeConnectionService conn) {
if (conn == null || conn.isClosed()) {
throw SnowflakeErrors.ERROR_5010.getException();
Expand Down Expand Up @@ -135,6 +146,28 @@ public void startPartition(final String tableName, final TopicPartition topicPar
conn,
topicPartition.partition(),
cleanerServiceExecutor));

if (enableStageFilePrefixExtension
&& TopicToTableModeExtractor.determineTopic2TableMode(
topic2TableMap, topicPartition.topic())
== TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE) {
// if snowflake.snowpipe.stageFileNameExtensionEnabled is enabled and table is used by
// multiple topics, we may end up in a situation, when data from different topics may have
// ended up in the same bucket - after enabling this fix, that data will stay on stage
// forever - we want to give user information about such situation and we will list all
// files, which wouldn't be processed by connector anymore.
String key = String.format("%s-%d", tableName, topicPartition.partition());
synchronized (perTableWarningNotifications) {
if (!perTableWarningNotifications.contains(key)) {
perTableWarningNotifications.add(key);
ForkJoinPool.commonPool()
.submit(
() ->
checkTableStageForObsoleteFiles(
stageName, tableName, topicPartition.partition()));
}
}
}
}
}

Expand Down Expand Up @@ -373,6 +406,35 @@ protected static String getNameIndex(String topic, int partition) {
return topic + "_" + partition;
}

public void configureSingleTableLoadFromMultipleTopcis(boolean fixEnabled) {
enableStageFilePrefixExtension = fixEnabled;
}

/**
* util method, checks if there are stage files present matching "old" file name format, if they
* are - lists them and asks user to manually delete them.
*/
private void checkTableStageForObsoleteFiles(String stageName, String tableName, int partition) {
try {
String prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, null, partition);
List<String> stageFiles = conn.listStage(stageName, prefix);
if (!stageFiles.isEmpty()) {
LOGGER.warn(
"NOTE: For table {} there are {} files matching {} prefix.",
tableName,
stageFiles.size(),
prefix);
stageFiles.sort(String::compareToIgnoreCase);
LOGGER.warn("Please consider manually deleting these files:");
for (List<String> names : Lists.partition(stageFiles, 10)) {
LOGGER.warn(String.join(", ", names));
}
}
} catch (Exception err) {
LOGGER.warn("could not query stage - {}<{}>", err.getMessage(), err.getClass().getName());
}
}

private class ServiceContext {
private final String tableName;
private final String stageName;
Expand Down Expand Up @@ -437,8 +499,20 @@ private ServiceContext(
// SNOW-1642799 = if multiple topics load data into single table, we need to ensure prefix is
// unique per table - otherwise, file cleaners for different channels may run into race
// condition
if (determineTopic2TableMode(topic2TableMap, topicName)
== TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE) {
TopicToTableModeExtractor.Topic2TableMode mode =
determineTopic2TableMode(topic2TableMap, topicName);
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& !enableStageFilePrefixExtension) {
LOGGER.warn(
"The table {} is used as ingestion target by multiple topics - including this one"
+ " '{}'.\n"
+ "To prevent potential data loss consider setting"
+ " 'snowflake.snowpipe.stageFileNameExtensionEnabled' to true",
topicName,
tableName);
}
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& enableStageFilePrefixExtension) {
this.prefix =
FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition);
} else {
Expand Down

0 comments on commit 7897e9f

Please sign in to comment.