Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE: Fix HTTP 500 on protobuf Any type #696

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.protobuf.StructProto;
import com.google.protobuf.TimestampProto;
import com.google.protobuf.TypeProto;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.WrappersProto;
import com.google.protobuf.util.JsonFormat;
import com.google.type.ColorProto;
Expand Down Expand Up @@ -147,12 +148,18 @@ public boolean canSerialize(String topic, Serde.Target type) {
@Override
public Serde.Serializer serializer(String topic, Serde.Target type) {
var descriptor = descriptorFor(topic, type).orElseThrow();
TypeRegistry typeRegistry = TypeRegistry.newBuilder()
.add(descriptorPaths.keySet())
.build();

return new Serde.Serializer() {
@SneakyThrows
@Override
public byte[] serialize(String input) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
JsonFormat.parser().merge(input, builder);
JsonFormat.parser()
.usingTypeRegistry(typeRegistry)
.merge(input, builder);
return builder.build().toByteArray();
}
};
Expand Down
14 changes: 14 additions & 0 deletions api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kafbat.ui.container.KafkaConnectContainer;
import io.kafbat.ui.container.KsqlDbContainer;
import io.kafbat.ui.container.SchemaRegistryContainer;
import java.io.FileNotFoundException;
import java.nio.file.Path;
import java.util.List;
import java.util.Properties;
Expand All @@ -22,6 +23,7 @@
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.util.TestSocketUtils;
import org.springframework.util.ResourceUtils;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
Expand Down Expand Up @@ -75,6 +77,18 @@ public static class Initializer
public void initialize(@NotNull ConfigurableApplicationContext context) {
System.setProperty("kafka.clusters.0.name", LOCAL);
System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers());

// Add ProtobufFileSerde configuration
System.setProperty("kafka.clusters.0.serde.0.name", "ProtobufFile");
System.setProperty("kafka.clusters.0.serde.0.topicValuesPattern", "masking-test-.*");
try {
System.setProperty("kafka.clusters.0.serde.0.properties.protobufFilesDir",
ResourceUtils.getFile("classpath:protobuf-serde").getAbsolutePath());
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
System.setProperty("kafka.clusters.0.serde.0.properties.protobufMessageName", "test.MessageWithAny");

// List unavailable hosts to verify failover
System.setProperty("kafka.clusters.0.schemaRegistry",
String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@ void setUp() throws Exception {
void loadsAllProtoFiledFromTargetDirectory() throws Exception {
var protoDir = ResourceUtils.getFile("classpath:protobuf-serde/").getPath();
List<ProtoFile> files = new ProtobufFileSerde.ProtoSchemaLoader(protoDir).load();
assertThat(files).hasSize(4);
assertThat(files).hasSize(5);
assertThat(files)
.map(f -> f.getLocation().getPath())
.containsExactlyInAnyOrder(
"language/language.proto",
"sensor.proto",
"address-book.proto",
"lang-description.proto"
"lang-description.proto",
"messagewithany.proto"
);
}

Expand Down
30 changes: 30 additions & 0 deletions api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kafbat.ui.model.TopicMessageDTO;
import io.kafbat.ui.model.TopicMessageEventDTO;
import io.kafbat.ui.producer.KafkaTestProducer;
import io.kafbat.ui.serdes.builtin.ProtobufFileSerde;
import io.kafbat.ui.serdes.builtin.StringSerde;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -214,4 +215,33 @@ void execSmartFilterTestReturnsErrorOnFilterCompilationError() {
assertThat(result.getError()).containsIgnoringCase("Compilation error");
}

@Test
void sendMessageWithProtobufAnyType() {
String jsonContent = """
{
"name": "testName",
"payload": {
"@type": "type.googleapis.com/test.PayloadMessage",
"id": "123"
}
}
""";

CreateTopicMessageDTO testMessage = new CreateTopicMessageDTO()
.key(null)
.partition(0)
.keySerde(StringSerde.name())
.content(jsonContent)
.valueSerde(ProtobufFileSerde.name());

String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID();
createTopicWithCleanup(new NewTopic(testTopic, 5, (short) 1));

StepVerifier.create(messagesService.sendMessage(cluster, testTopic, testMessage))
.expectNextMatches(metadata -> metadata.topic().equals(testTopic)
&& metadata.partition() == 0
&& metadata.offset() >= 0)
.verifyComplete();
}

}
13 changes: 13 additions & 0 deletions api/src/test/resources/protobuf-serde/messagewithany.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";
package test;

import "google/protobuf/any.proto";

message MessageWithAny {
string name = 1;
google.protobuf.Any payload = 2;
}

message PayloadMessage {
string id = 1;
}
Loading