Skip to content

Commit

Permalink
[server] Rewrite gRPC read service to bridge gap with Netty-based HTT…
Browse files Browse the repository at this point in the history
…P server
  • Loading branch information
sushantmane committed Sep 9, 2024
1 parent a782eee commit 2a298ff
Show file tree
Hide file tree
Showing 47 changed files with 1,493 additions and 1,176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,17 +267,24 @@ public VeniceGrpcStreamObserver(CompletableFuture<TransportClientResponse> respo

@Override
public void onNext(VeniceServerResponse value) {
if (value.getErrorCode() != VeniceReadResponseStatus.OK) {
handleResponseError(value);
int statusCode = value.getErrorCode();
// Successful response
if (statusCode == VeniceReadResponseStatus.OK.getCode()) {
complete(
new TransportClientResponse(
value.getSchemaId(),
CompressionStrategy.valueOf(value.getCompressionStrategy()),
value.getData().toByteArray()),
null);
return;
}

complete(
new TransportClientResponse(
value.getSchemaId(),
CompressionStrategy.valueOf(value.getCompressionStrategy()),
value.getData().toByteArray()),
null);
// Key not found is a valid response
if (statusCode == VeniceReadResponseStatus.KEY_NOT_FOUND.getCode()) {
complete(null, null);
return;
}
// Handle the cases where the status code doesn't match healthy response codes
handleResponseError(value);
}

@Override
Expand Down Expand Up @@ -308,30 +315,29 @@ void handleResponseError(VeniceServerResponse response) {
int statusCode = response.getErrorCode();
String errorMessage = response.getErrorMessage();
Exception exception;

switch (statusCode) {
case VeniceReadResponseStatus.BAD_REQUEST:
exception = new VeniceClientHttpException(errorMessage, statusCode);
break;
case VeniceReadResponseStatus.TOO_MANY_REQUESTS:
exception = new VeniceClientRateExceededException(errorMessage);
break;
case VeniceReadResponseStatus.KEY_NOT_FOUND:
exception = null;
break;
default:
exception = new VeniceClientException(
String
.format("An unexpected error occurred with status code: %d, message: %s", statusCode, errorMessage));
break;
}

if (exception != null) {
LOGGER.error("Got error in response due to", exception);
try {
switch (VeniceReadResponseStatus.fromCode(statusCode)) {
case BAD_REQUEST:
exception = new VeniceClientHttpException(errorMessage, statusCode);
break;
case TOO_MANY_REQUESTS:
exception = new VeniceClientRateExceededException(errorMessage);
break;
default:
exception = new VeniceClientException(
String.format(
"An unexpected error occurred with status code: %d, message: %s",
statusCode,
errorMessage));
break;
}
} catch (IllegalArgumentException e) {
// Handle the case where the status code doesn't match any known values
exception = new VeniceClientException(
String.format("Unknown status code: %d, message: %s", statusCode, errorMessage),
e);
}

// In the event of record not found, we treat that as a successful response and complete the future with a null
// value and the exception is set to null as well.
LOGGER.error("Received error response with status code: {}, message: {}", statusCode, errorMessage);
complete(null, exception);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.linkedin.venice.grpc;

import com.google.protobuf.ByteString;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.security.SSLConfig;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.utils.SslUtils;
import io.grpc.Grpc;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -92,4 +94,19 @@ private static KeyStore loadStore(String path, char[] password, String type)
}
return keyStore;
}

/**
* Converts a Netty ByteBuf to a ByteString, checking if it has a backing array to avoid manual copying.
*
* @param body The ByteBuf to be converted to ByteString.
* @return The resulting ByteString.
*/
public static ByteString toByteString(ByteBuf body) {
if (body.hasArray()) {
// Directly use the backing array to avoid copying
return ByteString.copyFrom(body.array(), body.arrayOffset() + body.readerIndex(), body.readableBytes());
}
// Fallback to nioBuffer() to handle the conversion efficiently
return ByteString.copyFrom(body.nioBuffer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,14 @@ public enum ServerAdminAction {
public int getValue() {
return this.value;
}

// get the enum value from the integer value
public static ServerAdminAction fromValue(int value) {
for (ServerAdminAction action: ServerAdminAction.values()) {
if (action.getValue() == value) {
return action;
}
}
throw new IllegalArgumentException("Invalid value for ServerAdminAction: " + value);
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,40 @@
package com.linkedin.venice.response;

import io.netty.handler.codec.http.HttpResponseStatus;


/**
* Enumeration of response status codes for Venice read requests.
* <p>
* **Positive values** correspond to standard HTTP status codes and can be used directly in HTTP responses.
* **Negative values** represent custom Venice-specific error codes.
* <p>
* For example, a status code of `200` indicates a successful read, while a status code of `-100` might indicate a specific Venice-related error.
* Defines response status codes for Venice read requests. This wrapper around {@link HttpResponseStatus} allows
* for the inclusion of custom status codes that extend beyond the standard HTTP status codes.
*/
public class VeniceReadResponseStatus {
public static final int KEY_NOT_FOUND = -420;

public static final int OK = 200;
public static final int BAD_REQUEST = 400;
public static final int INTERNAL_ERROR = 500;
public static final int TOO_MANY_REQUESTS = 429;
public static final int SERVICE_UNAVAILABLE = 503;
public enum VeniceReadResponseStatus {
KEY_NOT_FOUND(HttpResponseStatus.NOT_FOUND), OK(HttpResponseStatus.OK), BAD_REQUEST(HttpResponseStatus.BAD_REQUEST),
FORBIDDEN(HttpResponseStatus.FORBIDDEN), METHOD_NOT_ALLOWED(HttpResponseStatus.METHOD_NOT_ALLOWED),
REQUEST_TIMEOUT(HttpResponseStatus.REQUEST_TIMEOUT), TOO_MANY_REQUESTS(HttpResponseStatus.TOO_MANY_REQUESTS),
INTERNAL_SERVER_ERROR(HttpResponseStatus.INTERNAL_SERVER_ERROR),
SERVICE_UNAVAILABLE(HttpResponseStatus.SERVICE_UNAVAILABLE),
MISROUTED_STORE_VERSION(new HttpResponseStatus(570, "Misrouted request"));

private final HttpResponseStatus httpResponseStatus;

VeniceReadResponseStatus(HttpResponseStatus httpResponseStatus) {
this.httpResponseStatus = httpResponseStatus;
}

public HttpResponseStatus getHttpResponseStatus() {
return httpResponseStatus;
}

public int getCode() {
return httpResponseStatus.code();
}

public static VeniceReadResponseStatus fromCode(int code) {
for (VeniceReadResponseStatus status: values()) {
if (status.getCode() == code) {
return status;
}
}
throw new IllegalArgumentException("Unknown status venice read response status code: " + code);
}
}
151 changes: 149 additions & 2 deletions internal/venice-common/src/main/proto/VeniceReadService.proto
Original file line number Diff line number Diff line change
@@ -1,11 +1,36 @@
syntax = 'proto3';
package com.linkedin.venice.protocols;
import "google/protobuf/empty.proto";

option java_multiple_files = true;

service VeniceReadService {
rpc get (VeniceClientRequest) returns (VeniceServerResponse) {}
rpc batchGet(VeniceClientRequest) returns (VeniceServerResponse) {}

rpc get (VeniceClientRequest) returns (VeniceServerResponse) {
option deprecated = true;
}

rpc batchGet(VeniceClientRequest) returns (VeniceServerResponse) {
option deprecated = true;
}

rpc singleGet(SingleGetRequest) returns (SingleGetResponse) {}

rpc multiGet(MultiGetRequest) returns (MultiGetResponse) {}

rpc compute(ComputeRequest) returns (MultiGetResponse) {}

rpc isServerHealthy(HealthCheckRequest) returns (HealthCheckResponse) {}

rpc getCompressionDictionary(GetCompressionDictionaryRequest) returns (GetCompressionDictionaryResponse) {}

rpc handleAdminRequest(AdminRequest) returns (AdminResponse) {}

rpc getMetadata(MetadataRequest) returns (MetadataResponse) {}

rpc getCurrentVersionInfo(CurrentVersionInfoRequest) returns (CurrentVersionInfoResponse) {}

rpc getIngestionContext(IngestionContextRequest) returns (IngestionContextResponse) {}
}

message VeniceClientRequest {
Expand All @@ -29,4 +54,126 @@ message VeniceServerResponse {

uint32 errorCode = 6;
string errorMessage = 7;
}

message SingleGetRequest {
string resourceName = 1;
uint32 partition = 2;
string key = 3;
bool isRetryRequest = 4;
}

message SingleGetResponse {
int32 statusCode = 1;
bytes value = 2;
sint32 schemaId = 3;
uint32 compressionStrategy = 4;
optional string errorMessage = 5;
string contentType = 6;
uint32 contentLength = 7;
uint32 rcu = 8;
}

message MultiGetRequest {
string resourceName = 1;
uint32 partition = 2;
bytes keyBytes = 3; // used for batch get
uint32 keyCount = 4;
bool isRetryRequest = 5;
}

message MultiGetResponse {
int32 statusCode = 1;
bytes value = 2;
sint32 schemaId = 3;
uint32 compressionStrategy = 4;
optional string errorMessage = 5;
string contentType = 6;
uint32 contentLength = 7;
uint32 rcu = 8;
}


message ComputeRequest {
string resourceName = 1;
bytes payload = 2;
sint32 schemaId = 3;
bool isRetryRequest = 4;
bool isStreamingRequest = 5;
uint32 apiVersion = 6;
sint32 computeValueSchemaId = 7;
}

message HealthCheckRequest {
}

message HealthCheckResponse {
int32 statusCode = 1;
string message = 2;
}

message GetCompressionDictionaryRequest {
string storeName = 1;
uint32 storeVersion = 2;
}

message GetCompressionDictionaryResponse {
int32 statusCode = 1;
bytes value = 2;
string contentType = 3;
uint32 contentLength = 4;
string errorMessage = 5;
}

message AdminRequest {
string resourceName = 1;
optional uint32 partition = 2;
string serverAdminAction = 3;
}

message AdminResponse {
int32 statusCode = 1;
bytes value = 2;
sint32 schemaId = 3;
string contentType = 4;
uint32 contentLength = 5;
string errorMessage = 6;
}

message CurrentVersionInfoRequest {
string storeName = 1;
}

message CurrentVersionInfoResponse {
int32 statusCode = 1;
sint32 currentVersion = 2;
string errorMessage = 3;
string contentType = 4;
}

message MetadataRequest {
string storeName = 1;
}

message MetadataResponse {
int32 statusCode = 1;
bytes value = 2;
sint32 schemaId = 3;
string contentType = 4;
uint32 contentLength = 5;
string errorMessage = 6;
}

message IngestionContextRequest {
string versionTopicName = 1;
string topicName = 2;
uint32 partition = 3;
}

message IngestionContextResponse {
int32 statusCode = 1;
bytes value = 2;
string contentType = 3;
uint32 contentLength = 4;
string errorMessage = 5;
}
Loading

0 comments on commit 2a298ff

Please sign in to comment.