Skip to content

Commit

Permalink
Support protobin file loading from Classpath
Browse files Browse the repository at this point in the history
  • Loading branch information
krmahadevan committed Nov 20, 2022
1 parent a4835eb commit 364b250
Show file tree
Hide file tree
Showing 18 changed files with 634 additions and 149 deletions.
18 changes: 18 additions & 0 deletions karate-grpc-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,24 @@
<artifactId>karate-core</artifactId>
<version>${karate.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.github.classgraph/classgraph -->
<dependency>
<groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId>
<version>4.8.132</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>7.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.23.1</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private String getFullMethodName() {
/**
* Returns the appropriate method type based on whether the client or server expect streams.
*/
private MethodDescriptor.MethodType getMethodType() {
public MethodDescriptor.MethodType getMethodType() {
boolean clientStreaming = protoMethodDescriptor.toProto().getClientStreaming();
boolean serverStreaming = protoMethodDescriptor.toProto().getServerStreaming();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.thinkerou.karate.message;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
Expand All @@ -13,7 +14,6 @@

/**
* Reader
*
* A utility class which knows how to read proto files written using message.Writer.
*
* @author thinkerou
Expand All @@ -26,6 +26,12 @@ public final class Reader {
private final Descriptors.Descriptor descriptor;
private final List<Map<String, Object>> payloadList;

Reader(JsonFormat.Parser jsonParser,
Descriptors.Descriptor descriptor,
Map<String, Object> payload) {
this(jsonParser, descriptor, Collections.singletonList(payload));
}

/**
* @param jsonParser json parser
* @param descriptor descriptor
Expand All @@ -45,11 +51,18 @@ public final class Reader {
* @param registry registry
* @return Reader
*/
public static Reader create(Descriptors.Descriptor descriptor, List<Map<String, Object>> payloadList,
JsonFormat.TypeRegistry registry) {
public static Reader create(Descriptors.Descriptor descriptor,
List<Map<String, Object>> payloadList,
JsonFormat.TypeRegistry registry) {
return new Reader(JsonFormat.parser().usingTypeRegistry(registry), descriptor, payloadList);
}

public static Reader create(Descriptors.Descriptor descriptor,
Map<String, Object> payload,
JsonFormat.TypeRegistry registry) {
return new Reader(JsonFormat.parser().usingTypeRegistry(registry), descriptor, payload);
}

/**
* @return ImmutableList
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* Writer
*
* A StreamObserver which writes the contents of the received messages to an Output.
* The messages are writting in a newline-separated json format.
* The messages are written in a newline-separated json format.
*
* @author thinkerou
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.github.thinkerou.karate.protobuf;

import com.google.protobuf.Descriptors.MethodDescriptor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.logging.Logger;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -50,7 +53,7 @@ public static ServiceResolver fromFileDescriptorSet(DescriptorProtos.FileDescrip
}

/**
* Lists all of the services found in the file descriptors.
* Lists all the services found in the file descriptors.
*
* @return Iterable
*/
Expand Down Expand Up @@ -84,38 +87,32 @@ private ServiceResolver(Iterable<Descriptors.FileDescriptor> fileDescriptors) {
* @param method method
* @return MethodDescriptor
*/
public Descriptors.MethodDescriptor resolveServiceMethod(ProtoName method) {
public Optional<MethodDescriptor> resolveServiceMethod(ProtoName method) {
return resolveServiceMethod(
method.getServiceName(),
method.getMethodName(),
method.getPackageName());
}

private Descriptors.MethodDescriptor resolveServiceMethod(
private Optional<MethodDescriptor> resolveServiceMethod(
String serviceName, String methodName, String packageName) {
Descriptors.ServiceDescriptor service = findService(serviceName, packageName);
Descriptors.MethodDescriptor method = service.findMethodByName(methodName);
if (method == null) {
throw new IllegalArgumentException(
"Can't find method " + methodName + " in service " + serviceName);
}

return method;
return findService(serviceName, packageName)
.map(serviceDescriptor -> serviceDescriptor.findMethodByName(methodName));
}

private Descriptors.ServiceDescriptor findService(String serviceName, String packageName) {
for (Descriptors.FileDescriptor fileDescriptor : fileDescriptors) {
if (!fileDescriptor.getPackage().equals(packageName)) {
// Package does not match this file, ignore.
continue;
}
private Optional<Descriptors.ServiceDescriptor> findService(String serviceName, String packageName) {
return getFileDescriptors().stream()
.filter(each -> each.getPackage().equals(packageName))
.map(each -> each.findServiceByName(serviceName))
.filter(Objects::nonNull)
.findFirst();
}

Descriptors.ServiceDescriptor serviceDescriptor = fileDescriptor.findServiceByName(serviceName);
if (serviceDescriptor != null) {
return serviceDescriptor;
}
private synchronized ImmutableList<Descriptors.FileDescriptor> getFileDescriptors() {
if (fileDescriptors.isEmpty()) {
listServices();
}
throw new IllegalArgumentException("Can't find service with name: " + serviceName);
return fileDescriptors;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,39 @@
package com.github.thinkerou.karate.service;

import static com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import static com.google.protobuf.util.JsonFormat.TypeRegistry;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Logger;

import com.github.thinkerou.karate.constants.DescriptorFile;
import com.github.thinkerou.karate.domain.ProtoName;
import com.github.thinkerou.karate.grpc.ChannelFactory;
import com.github.thinkerou.karate.grpc.ComponentObserver;
import com.github.thinkerou.karate.grpc.DynamicClient;
import com.github.thinkerou.karate.message.Reader;
import com.github.thinkerou.karate.message.Writer;
import com.github.thinkerou.karate.protobuf.ProtoFullName;
import com.github.thinkerou.karate.protobuf.ServiceResolver;
import com.github.thinkerou.karate.utils.DataReader;
import com.github.thinkerou.karate.utils.FileHelper;
import com.github.thinkerou.karate.utils.RedisHelper;
import com.google.common.collect.ImmutableList;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.DynamicMessage;
import com.github.thinkerou.karate.message.Writer;

import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.logging.Logger;

/**
* GrpcCall
Expand Down Expand Up @@ -89,64 +91,48 @@ public String invokeByRedis(String name, String payload, RedisHelper redisHelper
*/
private String execute(String name, String payload, RedisHelper redisHelper) {
ProtoName protoName = ProtoFullName.parse(name);
byte[] data;
if (redisHelper != null) {
data = redisHelper.getDescriptorSets();
} else {
String path = System.getProperty("user.home") + DescriptorFile.PROTO_PATH.getText();
Path descriptorPath = Paths.get(path + DescriptorFile.PROTO_FILE.getText());
FileHelper.validatePath(Optional.ofNullable(descriptorPath));
try {
data = Files.readAllBytes(descriptorPath);
} catch (IOException e) {
throw new IllegalArgumentException("Read descriptor path failed: " + descriptorPath.toString());
}
}

// Fetch the appropriate file descriptors for the service.
FileDescriptorSet fileDescriptorSet;
try {
fileDescriptorSet = FileDescriptorSet.parseFrom(data);
} catch (IOException e) {
throw new IllegalArgumentException("File descriptor set parse fail: " + e.getMessage());
}
Pair<ServiceResolver, Optional<MethodDescriptor>> found =
DataReader.read(redisHelper)
.stream()
.map(each -> new Pair<>(each, each.resolveServiceMethod(protoName)))
.filter(each -> each.right().isPresent())
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Service Resolver Not found"));

Descriptors.MethodDescriptor methodDescriptor = found.right()
.orElseThrow(() -> {
// When can't find service or method with name
// use service or/and method search once for help user
GrpcList list = new GrpcList();
String result1 = list.invoke(protoName.getServiceName(), "", false);
String result2 = list.invoke("", protoName.getMethodName(), false);
if (!result1.equals("[]") || !result2.equals("[{}]")) {
logger.warning(
"Call grpc failed, maybe you should see the follow grpc information.");
if (!result1.equals("[]")) {
logger.info(result1);
}
if (!result2.equals("[{}]")) {
logger.info(result2);
}
}
String text = "Can't find method " + protoName.getMethodName()
+ " in service " + protoName.getServiceName();
return new IllegalArgumentException(text);
});

// Set up the dynamic client and make the call.
ServiceResolver serviceResolver = ServiceResolver.fromFileDescriptorSet(fileDescriptorSet);
Descriptors.MethodDescriptor methodDescriptor;
try {
methodDescriptor = serviceResolver.resolveServiceMethod(protoName);
} catch (IllegalArgumentException e) {
// When can't find service or method with name
// use service or/and method search once for help user
GrpcList list = new GrpcList();
String result1 = list.invoke(protoName.getServiceName(), "", false);
String result2 = list.invoke("", protoName.getMethodName(), false);
if (!result1.equals("[]") || !result2.equals("[{}]")) {
logger.warning("Call grpc failed, maybe you should see the follow grpc information.");
if (!result1.equals("[]")) {
logger.info(result1);
}
if (!result2.equals("[{}]")) {
logger.info(result2);
}
}
throw new IllegalArgumentException(e.getMessage());
}

// Create a dynamic grpc client.
DynamicClient dynamicClient = DynamicClient.create(methodDescriptor, channel);

// This collects all know types into a registry for resolution of potential "Any" types.
TypeRegistry registry = TypeRegistry.newBuilder().add(serviceResolver.listMessageTypes()).build();

// Convert payload string to list.
List<Map<String, Object>> payloadList = new Gson().fromJson(payload, List.class);
TypeRegistry registry = TypeRegistry.newBuilder().add(found.left().listMessageTypes()).build();

// Need to support stream so it's a list.
final ImmutableList<DynamicMessage> requestMessages = Reader.create(
methodDescriptor.getInputType(), payloadList, registry
).read();
final ImmutableList<DynamicMessage> requestMessages =
extractRequestMessages(payload, methodDescriptor, registry);

// Creates one temp file to save call grpc result.
Path filePath = null;
Expand All @@ -156,17 +142,21 @@ private String execute(String name, String payload, RedisHelper redisHelper) {
logger.warning(e.getMessage());
}
FileHelper.validatePath(Optional.ofNullable(filePath));

StreamObserver<DynamicMessage> streamObserver;
List<Object> output = new ArrayList<>();
StreamObserver<DynamicMessage> streamObserver = ComponentObserver.of(Writer.create(output, registry));
streamObserver = ComponentObserver.of(Writer.create(output, registry));

// Calls grpc!
try {
dynamicClient.call(requestMessages, streamObserver, callOptions()).get();
Objects.requireNonNull(
dynamicClient.call(requestMessages, streamObserver, callOptions())).get();
} catch (Throwable t) {
throw new RuntimeException("Caught exception while waiting for rpc", t);
}

if (dynamicClient.getMethodType() == MethodType.UNARY ||
dynamicClient.getMethodType() == MethodType.CLIENT_STREAMING) {
return output.get(0).toString();
}
return output.toString();
}

Expand All @@ -176,4 +166,17 @@ private static CallOptions callOptions() {
return result;
}

private static ImmutableList<DynamicMessage> extractRequestMessages(String raw,
Descriptors.MethodDescriptor methodDescriptor, TypeRegistry registry) {
Type type = new TypeToken<List<Map<String, Object>>>() {}.getType();
try {
List<Map<String, Object>> payloads = new Gson().fromJson(raw, type);
return Reader.create(methodDescriptor.getInputType(), payloads, registry).read();
} catch (JsonSyntaxException ignored) {
type = new TypeToken<Map<String, Object>>() {}.getType();
Map<String, Object> payload = new Gson().fromJson(raw, type);
return Reader.create(methodDescriptor.getInputType(), payload, registry).read();
}
}

}
Loading

0 comments on commit 364b250

Please sign in to comment.