Skip to content

Commit

Permalink
Support non-owned topic schema.
Browse files Browse the repository at this point in the history
fixes: #427
fixes: #452

SpecMesh already allows schema owned by other domains to be referenced by schema owned by the domain.  When encountered, such non-owned schema are ignored, i.e. to attempt is made to register them.  However, dependent schemas will fail to register if the non-owned schema is not already registered.

What was not supported till now, was a channel directly referencing a non-owned schema. What is called a 'topic schema' in the code. In this instance, the schema needs to be registered against the appropriate `${topic.name}-key` or `${topic.name}-value` subject, otherwise things won't work.

As the schema isn't owned by the domain, it can't just register the schema. Instead, it first ensures the schema is already registered under a subject name that matches its fully qualified name. For an avro schema the FQ name is `${schema.namespace}.${schema.name}`

Only if the non-owned schema has already been registered under the correct name can will the schema also be associated with the topic's key or value by registering the non-owned schema against the appropriate `${topic.name}-key` or `${topic.name}-value` subject.
  • Loading branch information
Andy Coates committed Jan 16, 2025
1 parent a3fd92d commit 8c8a66a
Show file tree
Hide file tree
Showing 14 changed files with 472 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright 2023 SpecMesh Contributors (https://github.com/specmesh)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.specmesh.cli;

import static io.specmesh.cli.util.CommonSchema.OTHER_SCHEMA_SUBJECT;
import static io.specmesh.cli.util.CommonSchema.TOPIC_KEY_SCHEMA_SUBJECT;
import static io.specmesh.cli.util.CommonSchema.TOPIC_VALUE_SCHEMA_SUBJECT;
import static io.specmesh.cli.util.CommonSchema.registerCommonSchema;
import static io.specmesh.kafka.provision.Status.STATE.CREATED;
import static io.specmesh.kafka.provision.Status.STATE.FAILED;
import static io.specmesh.kafka.provision.Status.STATE.IGNORED;
import static java.util.stream.Collectors.toMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.specmesh.cli.util.CommonSchema;
import io.specmesh.kafka.DockerKafkaEnvironment;
import io.specmesh.kafka.KafkaApiSpec;
import io.specmesh.kafka.KafkaEnvironment;
import io.specmesh.kafka.provision.Provisioner;
import io.specmesh.kafka.provision.Status;
import io.specmesh.kafka.provision.schema.SchemaProvisioner;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.kafka.clients.admin.Admin;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/**
* Functional test for using common schema, i.e. shares schema that are registered by another domain
*/
class ProvisionCommonSchemaFunctionalTest {

private static final String OWNER_USER = "simple.schema_demo";

private static final KafkaApiSpec SPEC =
KafkaApiSpec.loadFromFileSystem("src/test/resources/shared_schema_demo-api.yaml");

@RegisterExtension
private static final KafkaEnvironment KAFKA_ENV =
DockerKafkaEnvironment.builder()
.withSaslAuthentication(
"admin", "admin-secret", OWNER_USER, OWNER_USER + "-secret")
.withKafkaAcls()
.build();

@Test
void shouldFailIfSharedTopicSchemaNotRegistered() throws Exception {
try (Admin admin = KAFKA_ENV.adminClient();
SchemaRegistryClient srClient = KAFKA_ENV.srClient()) {

// Given:
CommonSchema.unregisterCommonSchema(srClient);

final Provisioner provisioner =
Provisioner.builder()
.apiSpec(SPEC)
.adminClient(admin)
.schemaRegistryClient(srClient)
.schemaPath("src/test/resources/")
.build();

// When:
final Status status = provisioner.provision();

// Then:
assertThat(status.failed(), is(true));

final Map<String, SchemaProvisioner.Schema> schemaBySubject =
status.schemas().stream()
.collect(toMap(SchemaProvisioner.Schema::subject, Function.identity()));

assertThat(
schemaBySubject.keySet(),
is(
Set.of(
OTHER_SCHEMA_SUBJECT,
TOPIC_KEY_SCHEMA_SUBJECT,
TOPIC_VALUE_SCHEMA_SUBJECT)));

assertThat(schemaBySubject.get(OTHER_SCHEMA_SUBJECT).state(), is(IGNORED));
assertThat(schemaBySubject.get(TOPIC_KEY_SCHEMA_SUBJECT).state(), is(FAILED));
assertThat(schemaBySubject.get(TOPIC_VALUE_SCHEMA_SUBJECT).state(), is(FAILED));

assertThat(
schemaBySubject.get(TOPIC_KEY_SCHEMA_SUBJECT).exception().getMessage(),
containsString(
"Topic schema that are not owned by the domain must already be"
+ " registered under subject matching fully qualified name. name:"
+ " other.domain.Common"));
}
}

@Test
void shouldSucceedIfSharedTopicSchemaAreRegisteredWithSubjectMatchingName() throws Exception {
try (Admin admin = KAFKA_ENV.adminClient();
SchemaRegistryClient srClient = KAFKA_ENV.srClient()) {

// Given:
registerCommonSchema(srClient);

final Provisioner provisioner =
Provisioner.builder()
.apiSpec(SPEC)
.adminClient(admin)
.schemaRegistryClient(srClient)
.schemaPath("src/test/resources/")
.build();

// When:
final Status status = provisioner.provision();

// Then:
status.check();

final Map<String, SchemaProvisioner.Schema> schemaBySubject =
status.schemas().stream()
.collect(toMap(SchemaProvisioner.Schema::subject, Function.identity()));

assertThat(
schemaBySubject.keySet(),
is(
Set.of(
OTHER_SCHEMA_SUBJECT,
TOPIC_KEY_SCHEMA_SUBJECT,
TOPIC_VALUE_SCHEMA_SUBJECT)));

assertThat(schemaBySubject.get(OTHER_SCHEMA_SUBJECT).state(), is(IGNORED));
assertThat(schemaBySubject.get(TOPIC_KEY_SCHEMA_SUBJECT).state(), is(CREATED));
assertThat(schemaBySubject.get(TOPIC_VALUE_SCHEMA_SUBJECT).state(), is(CREATED));

// When:
final Status statusRepublish = provisioner.provision();

// Then:
status.check();

final Map<String, SchemaProvisioner.Schema> republishSchemaBySubject =
statusRepublish.schemas().stream()
.collect(toMap(SchemaProvisioner.Schema::subject, Function.identity()));

assertThat(republishSchemaBySubject.keySet(), is(Set.of(OTHER_SCHEMA_SUBJECT)));

assertThat(republishSchemaBySubject.get(OTHER_SCHEMA_SUBJECT).state(), is(IGNORED));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,21 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.specmesh.cli.util.CommonSchema;
import io.specmesh.kafka.DockerKafkaEnvironment;
import io.specmesh.kafka.KafkaEnvironment;
import io.specmesh.kafka.provision.Status;
import io.specmesh.kafka.provision.TopicProvisioner.Topic;
import io.specmesh.kafka.provision.schema.SchemaProvisioner;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import picocli.CommandLine;

class ProvisionNestedFunctionalTest {
/** Functional test of specs that use schema that reference other schema. */
class ProvisionNestedSchemaFunctionalTest {

private static final String OWNER_USER = "simple.schema_demo";

Expand Down Expand Up @@ -94,7 +92,7 @@ void shouldProvisionTopicsAndAclResourcesWithNestedSchemasAndRepublishCorrectly(
.filter(s -> s.state() == Status.STATE.IGNORED)
.map(SchemaProvisioner.Schema::subject)
.collect(Collectors.toList()),
contains("other.domain.Common.subject"));
containsInAnyOrder("other.domain.Common", "other.domain.CommonOther"));

assertThat(status.acls(), hasSize(12));

Expand All @@ -116,14 +114,9 @@ void shouldProvisionTopicsAndAclResourcesWithNestedSchemasAndRepublishCorrectly(

private void givenCommonSchemaRegistered() {
try (SchemaRegistryClient srClient = KAFKA_ENV.srClient()) {
final ParsedSchema schema =
new AvroSchema(
Files.readString(
Path.of(
"./src/test/resources/schema/other.domain.Common.avsc")));
srClient.register("other.domain.Common.subject", schema);
CommonSchema.registerCommonSchema(srClient);
} catch (Exception e) {
throw new AssertionError("failed to register common schema", e);
throw new AssertionError("failed to close client", e);
}
}
}
135 changes: 135 additions & 0 deletions cli/src/test/java/io/specmesh/cli/util/CommonSchema.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2023 SpecMesh Contributors (https://github.com/specmesh)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.specmesh.cli.util;

import static java.util.stream.Collectors.toMap;

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

public final class CommonSchema {

public static final String OTHER_SCHEMA_SUBJECT = "other.domain.CommonOther";
public static final String TOPIC_KEY_SCHEMA_SUBJECT =
"simple.schema_demo._public.some.topic-key";
public static final String TOPIC_VALUE_SCHEMA_SUBJECT =
"simple.schema_demo._public.some.topic-value";

public static final List<Map.Entry<String, List<String>>> COMMON_SCHEMA =
List.of(
Map.entry(OTHER_SCHEMA_SUBJECT, List.of()),
Map.entry("other.domain.CommonKey", List.of()),
Map.entry("other.domain.Common", List.of(OTHER_SCHEMA_SUBJECT)));

private static final Path SCHEMA_ROOT = Path.of("./src/test/resources/schema/");

public static void registerCommonSchema(final SchemaRegistryClient srClient) {
// Common schema registration covered by
// https://github.com/specmesh/specmesh-build/issues/453.
// Until then, handle manually:
final Map<String, AvroSchema> cache = new HashMap<>();
COMMON_SCHEMA.forEach(
e -> buildAndRegisterSchema(e.getKey(), e.getValue(), cache, srClient));
}

public static void unregisterCommonSchema(final SchemaRegistryClient srClient) {
try {
final Map<String, ArrayList<String>> remaining =
COMMON_SCHEMA.stream()
.collect(toMap(Map.Entry::getKey, e -> new ArrayList<>(e.getValue())));

final Set<String> registeredSubjects = Set.copyOf(srClient.getAllSubjects());

while (!remaining.isEmpty()) {
final String subject =
remaining.entrySet().stream()
.filter(e -> e.getValue().isEmpty())
.findAny()
.map(Map.Entry::getKey)
.orElseThrow();

if (registeredSubjects.contains(subject)) {
srClient.deleteSubject(subject);
}

remaining.remove(subject);
remaining.values().forEach(deps -> deps.remove(subject));
}
} catch (Exception e) {
throw new AssertionError("Failed to delete subjects", e);
}
}

private static void buildAndRegisterSchema(
final String subject,
final List<String> dependencies,
final Map<String, AvroSchema> cache,
final SchemaRegistryClient srClient) {
cache.computeIfAbsent(
subject,
key -> {
final List<SchemaReference> references =
dependencies.stream()
.map(dep -> new SchemaReference(dep, dep, -1))
.collect(Collectors.toList());

final Map<String, String> resolvedReferences =
dependencies.stream()
.collect(
toMap(
Function.identity(),
sub -> cache.get(sub).canonicalString()));

final AvroSchema schema =
new AvroSchema(
readLocalSchema(subject, cache),
references,
resolvedReferences,
-1);
try {
final int id = srClient.register(subject, schema);
System.out.println("Registered " + subject + " with id " + id);
return schema;
} catch (Exception e) {
throw new AssertionError("failed to register common schema", e);
}
});
}

private static String readLocalSchema(
final String subject, final Map<String, AvroSchema> cache) {
final Path path = SCHEMA_ROOT.resolve(subject + ".avsc");
try {
return Files.readString(path);
} catch (IOException e) {
throw new AssertionError("Failed to read schema: " + path.toAbsolutePath(), e);
}
}

private CommonSchema() {}
}
2 changes: 1 addition & 1 deletion cli/src/test/resources/schema/other.domain.Common.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
"namespace": "other.domain",
"name": "Common",
"fields": [
{"name": "thing", "type": "string"}
{"name": "other", "type": "other.domain.CommonOther", "subject": "other.domain.CommonOther"}
]
}
8 changes: 8 additions & 0 deletions cli/src/test/resources/schema/other.domain.CommonKey.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"type": "record",
"namespace": "other.domain",
"name": "CommonKey",
"fields": [
{"name": "id", "type": "long"}
]
}
8 changes: 8 additions & 0 deletions cli/src/test/resources/schema/other.domain.CommonOther.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"type": "record",
"namespace": "other.domain",
"name": "CommonOther",
"fields": [
{"name": "thing", "type": "string"}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
"fields": [
{"name": "role", "type": "string"},
{"name": "user", "type": "simple.schema_demo.UserSignedUp", "subject": "simple.schema_demo._public.UserSignedUp"},
{"name": "common", "type": "other.domain.Common", "subject": "other.domain.Common.subject"}
{"name": "common", "type": "other.domain.Common", "subject": "other.domain.Common"}
]
}
Loading

0 comments on commit 8c8a66a

Please sign in to comment.