Skip to content

Commit

Permalink
SNOW-1357377 Add request Id in all streaming ingest APIs (#759)
Browse files Browse the repository at this point in the history
* SNOW-1357377 Add request Id in all streaming ingest APIs

* Fix tests and log requestId

* Format

* Remove request_id and no star imports
  • Loading branch information
sfc-gh-japatel authored Jun 13, 2024
1 parent bbf9291 commit e233920
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 22 deletions.
15 changes: 13 additions & 2 deletions src/main/java/net/snowflake/ingest/connection/RequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -678,12 +678,23 @@ public HttpGet generateHistoryRangeRequest(
*/
public HttpPost generateStreamingIngestPostRequest(
String payload, String endPoint, String message) {
LOGGER.debug("Generate Snowpipe streaming request: endpoint={}, payload={}", endPoint, payload);
final String requestId = UUID.randomUUID().toString();
LOGGER.debug(
"Generate Snowpipe streaming request: endpoint={}, payload={}, requestId={}",
endPoint,
payload,
requestId);
// Make the corresponding URI
URI uri = null;
try {
uri =
new URIBuilder().setScheme(scheme).setHost(host).setPort(port).setPath(endPoint).build();
new URIBuilder()
.setScheme(scheme)
.setHost(host)
.setPort(port)
.setPath(endPoint)
.setParameter(REQUEST_ID, requestId)
.build();
} catch (URISyntaxException e) {
throw new SFException(e, ErrorCode.BUILD_REQUEST_FAILURE, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,12 @@ Long getClientSequencer() {
}
}

// Optional Request ID. Used for diagnostic purposes.
private String requestId;

// Channels in request
private List<ChannelStatusRequestDTO> channels;

// Snowflake role used by client
private String role;

@JsonProperty("request_id")
String getRequestId() {
return requestId;
}

@JsonProperty("role")
public String getRole() {
return role;
Expand All @@ -85,11 +77,6 @@ public void setRole(String role) {
this.role = role;
}

@JsonProperty("request_id")
void setRequestId(String requestId) {
this.requestId = requestId;
}

@JsonProperty("channels")
void setChannels(List<ChannelStatusRequestDTO> channels) {
this.channels = channels;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,6 @@ ChannelsStatusResponse getChannelsStatus(
.collect(Collectors.toList());
request.setChannels(requestDTOs);
request.setRole(this.role);
request.setRequestId(this.flushService.getClientPrefix() + "_" + counter.getAndIncrement());

String payload = objectMapper.writeValueAsString(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,14 @@ public void testOpenChannelPostRequest() throws Exception {
requestBuilder.generateStreamingIngestPostRequest(
payload, OPEN_CHANNEL_ENDPOINT, "open channel");

Assert.assertEquals(
String.format("%s%s", urlStr, OPEN_CHANNEL_ENDPOINT), request.getRequestLine().getUri());
String expectedUrlPattern =
String.format("%s%s", urlStr, OPEN_CHANNEL_ENDPOINT) + "(\\?requestId=[a-f0-9\\-]{36})?";

Assert.assertTrue(
String.format(
"Expected URL to match pattern: %s but was: %s",
expectedUrlPattern, request.getRequestLine().getUri()),
request.getRequestLine().getUri().matches(expectedUrlPattern));
Assert.assertNotNull(request.getFirstHeader(HttpHeaders.USER_AGENT));
Assert.assertNotNull(request.getFirstHeader(HttpHeaders.AUTHORIZATION));
Assert.assertEquals("POST", request.getMethod());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ public void testGetChannelsStatusWithRequest() throws Exception {
ChannelsStatusRequest.ChannelStatusRequestDTO dto =
new ChannelsStatusRequest.ChannelStatusRequestDTO(channel);
ChannelsStatusRequest request = new ChannelsStatusRequest();
request.setRequestId("null_0");
request.setChannels(Collections.singletonList(dto));
ChannelsStatusResponse result = client.getChannelsStatus(Collections.singletonList(channel));
Assert.assertEquals(response.getMessage(), result.getMessage());
Expand Down Expand Up @@ -547,9 +546,15 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception {
HttpPost request =
requestBuilder.generateStreamingIngestPostRequest(
payload, REGISTER_BLOB_ENDPOINT, "register blob");
String expectedUrlPattern =
String.format("%s%s", urlStr, REGISTER_BLOB_ENDPOINT) + "(\\?requestId=[a-f0-9\\-]{36})?";

Assert.assertTrue(
String.format(
"Expected URL to match pattern: %s but was: %s",
expectedUrlPattern, request.getRequestLine().getUri()),
request.getRequestLine().getUri().matches(expectedUrlPattern));

Assert.assertEquals(
String.format("%s%s", urlStr, REGISTER_BLOB_ENDPOINT), request.getRequestLine().getUri());
Assert.assertNotNull(request.getFirstHeader(HttpHeaders.USER_AGENT));
Assert.assertNotNull(request.getFirstHeader(HttpHeaders.AUTHORIZATION));
Assert.assertEquals("POST", request.getMethod());
Expand Down Expand Up @@ -1432,7 +1437,6 @@ public void testGetLatestCommittedOffsetTokens() throws Exception {
ChannelsStatusRequest.ChannelStatusRequestDTO dto =
new ChannelsStatusRequest.ChannelStatusRequestDTO(channel);
ChannelsStatusRequest request = new ChannelsStatusRequest();
request.setRequestId("null_0");
request.setChannels(Collections.singletonList(dto));
Map<String, String> result =
client.getLatestCommittedOffsetTokens(Collections.singletonList(channel));
Expand Down

0 comments on commit e233920

Please sign in to comment.