diff --git a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java index 55929400d..506d4e221 100644 --- a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java +++ b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java @@ -678,12 +678,23 @@ public HttpGet generateHistoryRangeRequest( */ public HttpPost generateStreamingIngestPostRequest( String payload, String endPoint, String message) { - LOGGER.debug("Generate Snowpipe streaming request: endpoint={}, payload={}", endPoint, payload); + final String requestId = UUID.randomUUID().toString(); + LOGGER.debug( + "Generate Snowpipe streaming request: endpoint={}, payload={}, requestId={}", + endPoint, + payload, + requestId); // Make the corresponding URI URI uri = null; try { uri = - new URIBuilder().setScheme(scheme).setHost(host).setPort(port).setPath(endPoint).build(); + new URIBuilder() + .setScheme(scheme) + .setHost(host) + .setPort(port) + .setPath(endPoint) + .setParameter(REQUEST_ID, requestId) + .build(); } catch (URISyntaxException e) { throw new SFException(e, ErrorCode.BUILD_REQUEST_FAILURE, message); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java index b98782ab9..c72c62a4b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java @@ -61,20 +61,12 @@ Long getClientSequencer() { } } - // Optional Request ID. Used for diagnostic purposes. - private String requestId; - // Channels in request private List channels; // Snowflake role used by client private String role; - @JsonProperty("request_id") - String getRequestId() { - return requestId; - } - @JsonProperty("role") public String getRole() { return role; @@ -85,11 +77,6 @@ public void setRole(String role) { this.role = role; } - @JsonProperty("request_id") - void setRequestId(String requestId) { - this.requestId = requestId; - } - @JsonProperty("channels") void setChannels(List channels) { this.channels = channels; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 2990b49d8..75eb4f717 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -493,7 +493,6 @@ ChannelsStatusResponse getChannelsStatus( .collect(Collectors.toList()); request.setChannels(requestDTOs); request.setRole(this.role); - request.setRequestId(this.flushService.getClientPrefix() + "_" + counter.getAndIncrement()); String payload = objectMapper.writeValueAsString(request); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 87e3f8f11..4ddc61ece 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -313,8 +313,14 @@ public void testOpenChannelPostRequest() throws Exception { requestBuilder.generateStreamingIngestPostRequest( payload, OPEN_CHANNEL_ENDPOINT, "open channel"); - Assert.assertEquals( - String.format("%s%s", urlStr, OPEN_CHANNEL_ENDPOINT), request.getRequestLine().getUri()); + String expectedUrlPattern = + String.format("%s%s", urlStr, OPEN_CHANNEL_ENDPOINT) + "(\\?requestId=[a-f0-9\\-]{36})?"; + + Assert.assertTrue( + String.format( + "Expected URL to match pattern: %s but was: %s", + expectedUrlPattern, request.getRequestLine().getUri()), + request.getRequestLine().getUri().matches(expectedUrlPattern)); Assert.assertNotNull(request.getFirstHeader(HttpHeaders.USER_AGENT)); Assert.assertNotNull(request.getFirstHeader(HttpHeaders.AUTHORIZATION)); Assert.assertEquals("POST", request.getMethod()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 1693e1520..5b24bcc7f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -368,7 +368,6 @@ public void testGetChannelsStatusWithRequest() throws Exception { ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); ChannelsStatusRequest request = new ChannelsStatusRequest(); - request.setRequestId("null_0"); request.setChannels(Collections.singletonList(dto)); ChannelsStatusResponse result = client.getChannelsStatus(Collections.singletonList(channel)); Assert.assertEquals(response.getMessage(), result.getMessage()); @@ -547,9 +546,15 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { HttpPost request = requestBuilder.generateStreamingIngestPostRequest( payload, REGISTER_BLOB_ENDPOINT, "register blob"); + String expectedUrlPattern = + String.format("%s%s", urlStr, REGISTER_BLOB_ENDPOINT) + "(\\?requestId=[a-f0-9\\-]{36})?"; + + Assert.assertTrue( + String.format( + "Expected URL to match pattern: %s but was: %s", + expectedUrlPattern, request.getRequestLine().getUri()), + request.getRequestLine().getUri().matches(expectedUrlPattern)); - Assert.assertEquals( - String.format("%s%s", urlStr, REGISTER_BLOB_ENDPOINT), request.getRequestLine().getUri()); Assert.assertNotNull(request.getFirstHeader(HttpHeaders.USER_AGENT)); Assert.assertNotNull(request.getFirstHeader(HttpHeaders.AUTHORIZATION)); Assert.assertEquals("POST", request.getMethod()); @@ -1432,7 +1437,6 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); ChannelsStatusRequest request = new ChannelsStatusRequest(); - request.setRequestId("null_0"); request.setChannels(Collections.singletonList(dto)); Map result = client.getLatestCommittedOffsetTokens(Collections.singletonList(channel));