Skip to content

Commit

Permalink
oops revert streaming jmx changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Jul 25, 2023
1 parent f496ee8 commit 85dc5d3
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
*/
public class SnowflakeSinkConnectorConfig {

public static final String NAME = Utils.NAME;
static final String NAME = Utils.NAME;
public static final String TOPICS = "topics";

// Connector config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ private void createStreamingChannelForTopicPartition(
this.sinkTaskContext,
this.conn,
this.recordService,
this.conn.getTelemetryClient(),
this.enableCustomJMXMonitoring));
this.conn.getTelemetryClient()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,16 @@
import static java.time.temporal.ChronoUnit.SECONDS;
import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE;

import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.BufferThreshold;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.PartitionBuffer;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import com.snowflake.kafka.connector.records.SnowflakeJsonSchema;
Expand Down Expand Up @@ -175,15 +172,6 @@ public class TopicPartitionChannel {
// Reference to the Snowflake connection service
private final SnowflakeConnectionService conn;

// telemetry
private final SnowflakeTelemetryChannelStatus channelStatus;
// non null
private final MetricRegistry metricRegistry;

// Wrapper on Metric registry instance which will hold all registered metrics for this pipe
private final MetricsJmxReporter metricsJmxReporter;
private final boolean enableCustomJMXMonitoring;

/**
* Used to send telemetry to Snowflake. Currently, TelemetryClient created from a Snowflake
* Connection Object, i.e. not a session-less Client
Expand Down Expand Up @@ -212,8 +200,7 @@ public TopicPartitionChannel(
sinkTaskContext,
null, /* Null Connection */
new RecordService(null /* Null Telemetry Service*/),
null,
false);
null);
}

/**
Expand All @@ -230,8 +217,6 @@ public TopicPartitionChannel(
* @param recordService record service for processing incoming offsets from Kafka
* @param telemetryService Telemetry Service which includes the Telemetry Client, sends Json data
* to Snowflake
* @param enableCustomJMXMonitoring whether or not we enable Mbean for required classes and emit
* JMX metrics for monitoring
*/
public TopicPartitionChannel(
SnowflakeStreamingIngestClient streamingIngestClient,
Expand All @@ -244,8 +229,7 @@ public TopicPartitionChannel(
SinkTaskContext sinkTaskContext,
SnowflakeConnectionService conn,
RecordService recordService,
SnowflakeTelemetryService telemetryService,
boolean enableCustomJMXMonitoring) {
SnowflakeTelemetryService telemetryService) {
this.streamingIngestClient = Preconditions.checkNotNull(streamingIngestClient);
Preconditions.checkState(!streamingIngestClient.isClosed());
this.topicPartition = Preconditions.checkNotNull(topicPartition);
Expand Down Expand Up @@ -292,25 +276,6 @@ public TopicPartitionChannel(
+ " correct offset instead",
this.getChannelName());
}

// jmx
this.enableCustomJMXMonitoring = enableCustomJMXMonitoring;
this.metricRegistry = new MetricRegistry();
this.metricsJmxReporter =
new MetricsJmxReporter(
this.metricRegistry,
this.sfConnectorConfig.getOrDefault(
SnowflakeSinkConnectorConfig.NAME, "noConnectorNameFound"));

// telemetry
this.channelStatus =
new SnowflakeTelemetryChannelStatus(
this.tableName,
this.topicPartition.topic(),
this.topicPartition.partition(),
this.channelName,
this.enableCustomJMXMonitoring,
this.metricsJmxReporter);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,7 @@ public void testCloseChannelException() throws Exception {
mockSinkTaskContext,
mockSnowflakeConnectionService,
new RecordService(mockTelemetryService),
mockTelemetryService,
false);
mockTelemetryService);

topicPartitionChannel.closeChannel();
}
Expand Down Expand Up @@ -489,8 +488,7 @@ public void testInsertRowsWithSchemaEvolution() throws Exception {
mockSinkTaskContext,
conn,
new RecordService(),
mockTelemetryService,
false);
mockTelemetryService);

final int noOfRecords = 3;
List<SinkRecord> records =
Expand Down Expand Up @@ -614,8 +612,7 @@ public void testInsertRows_ValidationResponseHasErrors_NoErrorTolerance() throws
mockSinkTaskContext,
mockSnowflakeConnectionService,
new RecordService(mockTelemetryService),
mockTelemetryService,
false);
mockTelemetryService);

List<SinkRecord> records = TestUtils.createJsonStringSinkRecords(0, 1, TOPIC, PARTITION);

Expand Down

0 comments on commit 85dc5d3

Please sign in to comment.