From 0dbfa913aa4e0f5eef1d42075cd1d9d71882722b Mon Sep 17 00:00:00 2001 From: Jay Patel Date: Wed, 1 May 2024 10:41:01 -0700 Subject: [PATCH 1/4] SNOW-1357377 Add request Id in all streaming ingest APIs --- .../net/snowflake/ingest/connection/RequestBuilder.java | 8 +++++++- .../internal/SnowflakeStreamingIngestClientInternal.java | 1 - 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java index 55929400d..24974280a 100644 --- a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java +++ b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java @@ -683,7 +683,13 @@ public HttpPost generateStreamingIngestPostRequest( URI uri = null; try { uri = - new URIBuilder().setScheme(scheme).setHost(host).setPort(port).setPath(endPoint).build(); + new URIBuilder() + .setScheme(scheme) + .setHost(host) + .setPort(port) + .setPath(endPoint) + .setParameter(REQUEST_ID, UUID.randomUUID().toString()) + .build(); } catch (URISyntaxException e) { throw new SFException(e, ErrorCode.BUILD_REQUEST_FAILURE, message); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 2990b49d8..75eb4f717 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -493,7 +493,6 @@ ChannelsStatusResponse getChannelsStatus( .collect(Collectors.toList()); request.setChannels(requestDTOs); request.setRole(this.role); - request.setRequestId(this.flushService.getClientPrefix() + "_" + counter.getAndIncrement()); String payload = objectMapper.writeValueAsString(request); From f6c00715f0d22cf07c39b46d6447d53b4248860e Mon Sep 17 00:00:00 2001 From: Jay Patel Date: Fri, 7 Jun 2024 14:05:26 -0700 Subject: [PATCH 2/4] Fix tests and log requestId --- .../ingest/connection/RequestBuilder.java | 19 ++++++++++++------- .../SnowflakeStreamingIngestChannelTest.java | 17 +++++++++-------- .../SnowflakeStreamingIngestClientTest.java | 12 ++++++++---- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java index 24974280a..506d4e221 100644 --- a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java +++ b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java @@ -678,18 +678,23 @@ public HttpGet generateHistoryRangeRequest( */ public HttpPost generateStreamingIngestPostRequest( String payload, String endPoint, String message) { - LOGGER.debug("Generate Snowpipe streaming request: endpoint={}, payload={}", endPoint, payload); + final String requestId = UUID.randomUUID().toString(); + LOGGER.debug( + "Generate Snowpipe streaming request: endpoint={}, payload={}, requestId={}", + endPoint, + payload, + requestId); // Make the corresponding URI URI uri = null; try { uri = new URIBuilder() - .setScheme(scheme) - .setHost(host) - .setPort(port) - .setPath(endPoint) - .setParameter(REQUEST_ID, UUID.randomUUID().toString()) - .build(); + .setScheme(scheme) + .setHost(host) + .setPort(port) + .setPath(endPoint) + .setParameter(REQUEST_ID, requestId) + .build(); } catch (URISyntaxException e) { throw new SFException(e, ErrorCode.BUILD_REQUEST_FAILURE, message); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 87e3f8f11..f025fc89f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -1,12 +1,7 @@ package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; -import static net.snowflake.ingest.utils.Constants.ACCOUNT_URL; -import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; -import static net.snowflake.ingest.utils.Constants.PRIVATE_KEY; -import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; -import static net.snowflake.ingest.utils.Constants.ROLE; -import static net.snowflake.ingest.utils.Constants.USER; +import static net.snowflake.ingest.utils.Constants.*; import static org.mockito.ArgumentMatchers.argThat; import java.security.KeyPair; @@ -313,8 +308,14 @@ public void testOpenChannelPostRequest() throws Exception { requestBuilder.generateStreamingIngestPostRequest( payload, OPEN_CHANNEL_ENDPOINT, "open channel"); - Assert.assertEquals( - String.format("%s%s", urlStr, OPEN_CHANNEL_ENDPOINT), request.getRequestLine().getUri()); + String expectedUrlPattern = + String.format("%s%s", urlStr, OPEN_CHANNEL_ENDPOINT) + "(\\?requestId=[a-f0-9\\-]{36})?"; + + Assert.assertTrue( + String.format( + "Expected URL to match pattern: %s but was: %s", + expectedUrlPattern, request.getRequestLine().getUri()), + request.getRequestLine().getUri().matches(expectedUrlPattern)); Assert.assertNotNull(request.getFirstHeader(HttpHeaders.USER_AGENT)); Assert.assertNotNull(request.getFirstHeader(HttpHeaders.AUTHORIZATION)); Assert.assertEquals("POST", request.getMethod()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 1693e1520..5b24bcc7f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -368,7 +368,6 @@ public void testGetChannelsStatusWithRequest() throws Exception { ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); ChannelsStatusRequest request = new ChannelsStatusRequest(); - request.setRequestId("null_0"); request.setChannels(Collections.singletonList(dto)); ChannelsStatusResponse result = client.getChannelsStatus(Collections.singletonList(channel)); Assert.assertEquals(response.getMessage(), result.getMessage()); @@ -547,9 +546,15 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { HttpPost request = requestBuilder.generateStreamingIngestPostRequest( payload, REGISTER_BLOB_ENDPOINT, "register blob"); + String expectedUrlPattern = + String.format("%s%s", urlStr, REGISTER_BLOB_ENDPOINT) + "(\\?requestId=[a-f0-9\\-]{36})?"; + + Assert.assertTrue( + String.format( + "Expected URL to match pattern: %s but was: %s", + expectedUrlPattern, request.getRequestLine().getUri()), + request.getRequestLine().getUri().matches(expectedUrlPattern)); - Assert.assertEquals( - String.format("%s%s", urlStr, REGISTER_BLOB_ENDPOINT), request.getRequestLine().getUri()); Assert.assertNotNull(request.getFirstHeader(HttpHeaders.USER_AGENT)); Assert.assertNotNull(request.getFirstHeader(HttpHeaders.AUTHORIZATION)); Assert.assertEquals("POST", request.getMethod()); @@ -1432,7 +1437,6 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); ChannelsStatusRequest request = new ChannelsStatusRequest(); - request.setRequestId("null_0"); request.setChannels(Collections.singletonList(dto)); Map result = client.getLatestCommittedOffsetTokens(Collections.singletonList(channel)); From ba47eb2cd64eb84efe21bc8a0641e2868b9fc968 Mon Sep 17 00:00:00 2001 From: Jay Patel Date: Tue, 11 Jun 2024 15:48:39 -0700 Subject: [PATCH 3/4] Format --- .../internal/SnowflakeStreamingIngestChannelTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index f025fc89f..3666d5a1a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -309,13 +309,13 @@ public void testOpenChannelPostRequest() throws Exception { payload, OPEN_CHANNEL_ENDPOINT, "open channel"); String expectedUrlPattern = - String.format("%s%s", urlStr, OPEN_CHANNEL_ENDPOINT) + "(\\?requestId=[a-f0-9\\-]{36})?"; + String.format("%s%s", urlStr, OPEN_CHANNEL_ENDPOINT) + "(\\?requestId=[a-f0-9\\-]{36})?"; Assert.assertTrue( - String.format( - "Expected URL to match pattern: %s but was: %s", - expectedUrlPattern, request.getRequestLine().getUri()), - request.getRequestLine().getUri().matches(expectedUrlPattern)); + String.format( + "Expected URL to match pattern: %s but was: %s", + expectedUrlPattern, request.getRequestLine().getUri()), + request.getRequestLine().getUri().matches(expectedUrlPattern)); Assert.assertNotNull(request.getFirstHeader(HttpHeaders.USER_AGENT)); Assert.assertNotNull(request.getFirstHeader(HttpHeaders.AUTHORIZATION)); Assert.assertEquals("POST", request.getMethod()); From ce91a9af8c2b4dfd24b11f1e8c58c505a94d505b Mon Sep 17 00:00:00 2001 From: Jay Patel Date: Wed, 12 Jun 2024 16:15:13 -0700 Subject: [PATCH 4/4] Remove request_id and no star imports --- .../streaming/internal/ChannelsStatusRequest.java | 13 ------------- .../SnowflakeStreamingIngestChannelTest.java | 7 ++++++- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java index b98782ab9..c72c62a4b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java @@ -61,20 +61,12 @@ Long getClientSequencer() { } } - // Optional Request ID. Used for diagnostic purposes. - private String requestId; - // Channels in request private List channels; // Snowflake role used by client private String role; - @JsonProperty("request_id") - String getRequestId() { - return requestId; - } - @JsonProperty("role") public String getRole() { return role; @@ -85,11 +77,6 @@ public void setRole(String role) { this.role = role; } - @JsonProperty("request_id") - void setRequestId(String requestId) { - this.requestId = requestId; - } - @JsonProperty("channels") void setChannels(List channels) { this.channels = channels; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 3666d5a1a..4ddc61ece 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -1,7 +1,12 @@ package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; -import static net.snowflake.ingest.utils.Constants.*; +import static net.snowflake.ingest.utils.Constants.ACCOUNT_URL; +import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; +import static net.snowflake.ingest.utils.Constants.PRIVATE_KEY; +import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; +import static net.snowflake.ingest.utils.Constants.ROLE; +import static net.snowflake.ingest.utils.Constants.USER; import static org.mockito.ArgumentMatchers.argThat; import java.security.KeyPair;