From 9f2e646e9cb97a2dcfc179b5c56cd141092e4d53 Mon Sep 17 00:00:00 2001 From: Ludovic BOUTROS Date: Wed, 24 Aug 2022 10:30:03 +0200 Subject: [PATCH 1/4] Feat: support system property substitution in topology files --- pom.xml | 7 ++- .../serdes/SystemPropertySubstitutor.java | 56 +++++++++++++++++++ .../kafka/topology/serdes/TopologySerdes.java | 14 +++-- .../kafka/topology/TopologySerdesTest.java | 35 ++++++++++++ .../serdes/SystemPropertySubstitutorTest.java | 39 +++++++++++++ .../descriptor-with-system-properties.yaml | 26 +++++++++ ...riptor-with-unknown-system-properties.yaml | 26 +++++++++ 7 files changed, 198 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java create mode 100644 src/test/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutorTest.java create mode 100644 src/test/resources/descriptor-with-system-properties.yaml create mode 100644 src/test/resources/descriptor-with-unknown-system-properties.yaml diff --git a/pom.xml b/pom.xml index 7273aa7ee..44f923227 100644 --- a/pom.xml +++ b/pom.xml @@ -531,6 +531,7 @@ 2.17.1 3.5.7 1.4 + 1.9 3.6.0 4.13.1 1.15.3 @@ -735,7 +736,11 @@ 2.33.1 test - + + org.apache.commons + commons-text + ${commons-text.version} + diff --git a/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java b/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java new file mode 100644 index 000000000..2741e94c6 --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java @@ -0,0 +1,56 @@ +package com.purbon.kafka.topology.serdes; + +import com.purbon.kafka.topology.exceptions.TopologyParsingException; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.lookup.StrSubstitutor; +import org.apache.logging.log4j.status.StatusLogger; + +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A simple substitutor which substitutes system properties in the topology file. + * If a property is not found, it throws a {@link com.purbon.kafka.topology.exceptions.TopologyParsingException}. + */ +public class SystemPropertySubstitutor { + private final Map env; + private final StrSubstitutor strSubstitutor; + + public SystemPropertySubstitutor() { + env = System.getProperties().entrySet().stream() + .collect(Collectors.toMap(e -> (String) e.getKey(), e -> (String) e.getValue())); + + strSubstitutor = new StrSubstitutor(env, "${", "}") { + @Override + protected String resolveVariable(final LogEvent event, final String variableName, final StringBuilder buf, + final int startPos, final int endPos) { + String result = super.resolveVariable(event, variableName, buf, startPos, endPos); + if (result == null) { + throw new TopologyParsingException("Cannot resolve variable: " + variableName); + } + return result; + } + + @Override + public String replace(final LogEvent event, final String source) { + if (source == null) { + return null; + } + final StringBuilder buf = new StringBuilder(source); + try { + if (!substitute(event, buf, 0, source.length())) { + return source; + } + } catch (Throwable t) { + StatusLogger.getLogger().error("Replacement failed on {}", source, t); + throw t; + } + return buf.toString(); + } + }; + } + + public String replace(String original) { + return strSubstitutor.replace(original); + } +} diff --git a/src/main/java/com/purbon/kafka/topology/serdes/TopologySerdes.java b/src/main/java/com/purbon/kafka/topology/serdes/TopologySerdes.java index 3c9e0b4cf..7c12899f9 100644 --- a/src/main/java/com/purbon/kafka/topology/serdes/TopologySerdes.java +++ b/src/main/java/com/purbon/kafka/topology/serdes/TopologySerdes.java @@ -11,11 +11,14 @@ import com.purbon.kafka.topology.model.Topic; import com.purbon.kafka.topology.model.Topology; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; public class TopologySerdes { - private ObjectMapper mapper; + private final ObjectMapper mapper; + private final SystemPropertySubstitutor systemPropertySubstitutor; public enum FileType { JSON, @@ -32,11 +35,13 @@ public TopologySerdes(Configuration config, PlanMap plans) { public TopologySerdes(Configuration config, FileType type, PlanMap plans) { mapper = ObjectMapperFactory.build(type, config, plans); + systemPropertySubstitutor = new SystemPropertySubstitutor(); } public Topology deserialise(File file) { - try { - return mapper.readValue(file, Topology.class); + try (FileInputStream inputStream = new FileInputStream(file)) { + String content = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); + return deserialise(content); } catch (IOException e) { throw new TopologyParsingException( "Failed to deserialize topology from " + file.getPath(), e); @@ -45,7 +50,8 @@ public Topology deserialise(File file) { public Topology deserialise(String content) { try { - return mapper.readValue(content, Topology.class); + String substitutedContent = systemPropertySubstitutor.replace(content); + return mapper.readValue(substitutedContent, Topology.class); } catch (IOException e) { throw new TopologyParsingException("Failed to deserialize topology from " + content, e); } diff --git a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java index b86c39c35..3c153eac5 100644 --- a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java +++ b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java @@ -7,6 +7,7 @@ import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.DEVELOPER_READ; import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.RESOURCE_OWNER; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -35,12 +36,20 @@ import java.util.*; import java.util.stream.Collectors; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; public class TopologySerdesTest { private TopologySerdes parser; + @BeforeClass + public static void beforeClass() { + System.setProperty("env", "staging"); + System.setProperty("source", "partner"); + System.setProperty("project", "my_project"); + } + @Before public void setup() { Properties props = new Properties(); @@ -70,6 +79,32 @@ public void testMetadata() { .containsKey("system"); } + @Test + public void testSystemPropertySubstitution() { + // Given + // System properties already set in the setup fonction + + // When + Topology topology = + parser.deserialise(TestUtils.getResourceFile("/descriptor-with-system-properties.yaml")); + Project project = topology.getProjects().get(0); + + // Then + assertThat(project.getName()).isEqualTo("my_project"); + assertThat(project.namePrefix()).isEqualTo("staging.partner.my_project."); + } + + @Test + public void testInvalidSystemPropertySubstitution() { + // Given + // System properties already set in the setup fonction + + // When + // Then + assertThatThrownBy(() -> parser.deserialise(TestUtils.getResourceFile("/descriptor-with-unknown-system-properties.yaml"))) + .isExactlyInstanceOf(TopologyParsingException.class); + } + @Test public void testStreamsParsingOnlyReadTopicsShouldNotParseAsNull() { Topology topology = diff --git a/src/test/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutorTest.java b/src/test/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutorTest.java new file mode 100644 index 000000000..dda495b21 --- /dev/null +++ b/src/test/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutorTest.java @@ -0,0 +1,39 @@ +package com.purbon.kafka.topology.serdes; + +import com.purbon.kafka.topology.exceptions.TopologyParsingException; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class SystemPropertySubstitutorTest { + @Test + public void standardReplacement() { + // Given + System.getProperties().setProperty("env", "staging"); + System.getProperties().setProperty("project", "my_project"); + SystemPropertySubstitutor systemPropertySubstitutor = new SystemPropertySubstitutor(); + + // When + String result = systemPropertySubstitutor.replace( + "context: ${env}\n" + + "project: ${project}"); + + // Then + assertThat(result).isEqualTo( + "context: staging\n" + + "project: my_project" + ); + } + + @Test + public void notFoundKeyReplacement() { + // Given + SystemPropertySubstitutor systemPropertySubstitutor = new SystemPropertySubstitutor(); + + // When + // Then + assertThatThrownBy(() -> systemPropertySubstitutor.replace("${not_found_key}")) + .isExactlyInstanceOf(TopologyParsingException.class); + } +} \ No newline at end of file diff --git a/src/test/resources/descriptor-with-system-properties.yaml b/src/test/resources/descriptor-with-system-properties.yaml new file mode 100644 index 000000000..c501a84e5 --- /dev/null +++ b/src/test/resources/descriptor-with-system-properties.yaml @@ -0,0 +1,26 @@ +--- +context: ${env} +source: ${source} +projects: + - name: ${project} + consumers: + - principal: "User:App0" + metadata: + system: "OwnerSystem0" + producers: + - principal: "User:App1" + metadata: + contactInfo: "app1@company.com" + topics: + - name: "topicA" + consumers: + - principal: "User:App4" + metadata: + system: "OwnerSystem4" + config: + replication.factor: "1" + num.partitions: "1" + - name: "topicB" + dataType: "avro" + schemas: + value.schema.file: "schemas/bar-value.avsc" diff --git a/src/test/resources/descriptor-with-unknown-system-properties.yaml b/src/test/resources/descriptor-with-unknown-system-properties.yaml new file mode 100644 index 000000000..5683c09d5 --- /dev/null +++ b/src/test/resources/descriptor-with-unknown-system-properties.yaml @@ -0,0 +1,26 @@ +--- +context: ${unknown-env} +source: ${source} +projects: + - name: ${project} + consumers: + - principal: "User:App0" + metadata: + system: "OwnerSystem0" + producers: + - principal: "User:App1" + metadata: + contactInfo: "app1@company.com" + topics: + - name: "topicA" + consumers: + - principal: "User:App4" + metadata: + system: "OwnerSystem4" + config: + replication.factor: "1" + num.partitions: "1" + - name: "topicB" + dataType: "avro" + schemas: + value.schema.file: "schemas/bar-value.avsc" From 9351056712829375b8455a8e0781818b2400dcae Mon Sep 17 00:00:00 2001 From: Ludovic BOUTROS Date: Wed, 24 Aug 2022 10:31:32 +0200 Subject: [PATCH 2/4] Code formatting --- .../kafka/topology/api/ccloud/CCloudApi.java | 2 +- .../serdes/SystemPropertySubstitutor.java | 71 ++++++----- .../kafka/topology/TopologySerdesTest.java | 9 +- .../topology/api/ccloud/CCloudApiTest.java | 117 +++++++++--------- .../serdes/SystemPropertySubstitutorTest.java | 19 ++- 5 files changed, 110 insertions(+), 108 deletions(-) diff --git a/src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java b/src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java index 0cad01271..8461f6d81 100644 --- a/src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java +++ b/src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java @@ -159,7 +159,7 @@ private ListServiceAccountResponse getListServiceAccounts(String url, int page_s throws IOException { String requestUrl = url; if (!url.contains("page_token")) { - requestUrl = String.format("%s?page_size=%d", url, page_size); + requestUrl = String.format("%s?page_size=%d", url, page_size); } Response r = ccloudApiHttpClient.doGet(requestUrl); return (ListServiceAccountResponse) diff --git a/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java b/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java index 2741e94c6..33c07722d 100644 --- a/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java +++ b/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java @@ -1,53 +1,58 @@ package com.purbon.kafka.topology.serdes; import com.purbon.kafka.topology.exceptions.TopologyParsingException; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.lookup.StrSubstitutor; import org.apache.logging.log4j.status.StatusLogger; -import java.util.Map; -import java.util.stream.Collectors; - /** - * A simple substitutor which substitutes system properties in the topology file. - * If a property is not found, it throws a {@link com.purbon.kafka.topology.exceptions.TopologyParsingException}. + * A simple substitutor which substitutes system properties in the topology file. If a property is + * not found, it throws a {@link com.purbon.kafka.topology.exceptions.TopologyParsingException}. */ public class SystemPropertySubstitutor { private final Map env; private final StrSubstitutor strSubstitutor; public SystemPropertySubstitutor() { - env = System.getProperties().entrySet().stream() - .collect(Collectors.toMap(e -> (String) e.getKey(), e -> (String) e.getValue())); + env = + System.getProperties().entrySet().stream() + .collect(Collectors.toMap(e -> (String) e.getKey(), e -> (String) e.getValue())); - strSubstitutor = new StrSubstitutor(env, "${", "}") { - @Override - protected String resolveVariable(final LogEvent event, final String variableName, final StringBuilder buf, - final int startPos, final int endPos) { - String result = super.resolveVariable(event, variableName, buf, startPos, endPos); - if (result == null) { - throw new TopologyParsingException("Cannot resolve variable: " + variableName); - } - return result; - } + strSubstitutor = + new StrSubstitutor(env, "${", "}") { + @Override + protected String resolveVariable( + final LogEvent event, + final String variableName, + final StringBuilder buf, + final int startPos, + final int endPos) { + String result = super.resolveVariable(event, variableName, buf, startPos, endPos); + if (result == null) { + throw new TopologyParsingException("Cannot resolve variable: " + variableName); + } + return result; + } - @Override - public String replace(final LogEvent event, final String source) { - if (source == null) { - return null; - } - final StringBuilder buf = new StringBuilder(source); - try { - if (!substitute(event, buf, 0, source.length())) { - return source; + @Override + public String replace(final LogEvent event, final String source) { + if (source == null) { + return null; + } + final StringBuilder buf = new StringBuilder(source); + try { + if (!substitute(event, buf, 0, source.length())) { + return source; + } + } catch (Throwable t) { + StatusLogger.getLogger().error("Replacement failed on {}", source, t); + throw t; + } + return buf.toString(); } - } catch (Throwable t) { - StatusLogger.getLogger().error("Replacement failed on {}", source, t); - throw t; - } - return buf.toString(); - } - }; + }; } public String replace(String original) { diff --git a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java index 3c153eac5..18d84f0f1 100644 --- a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java +++ b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java @@ -86,7 +86,7 @@ public void testSystemPropertySubstitution() { // When Topology topology = - parser.deserialise(TestUtils.getResourceFile("/descriptor-with-system-properties.yaml")); + parser.deserialise(TestUtils.getResourceFile("/descriptor-with-system-properties.yaml")); Project project = topology.getProjects().get(0); // Then @@ -101,8 +101,11 @@ public void testInvalidSystemPropertySubstitution() { // When // Then - assertThatThrownBy(() -> parser.deserialise(TestUtils.getResourceFile("/descriptor-with-unknown-system-properties.yaml"))) - .isExactlyInstanceOf(TopologyParsingException.class); + assertThatThrownBy( + () -> + parser.deserialise( + TestUtils.getResourceFile("/descriptor-with-unknown-system-properties.yaml"))) + .isExactlyInstanceOf(TopologyParsingException.class); } @Test diff --git a/src/test/java/com/purbon/kafka/topology/api/ccloud/CCloudApiTest.java b/src/test/java/com/purbon/kafka/topology/api/ccloud/CCloudApiTest.java index ec4d0c3eb..fa10f2a3c 100644 --- a/src/test/java/com/purbon/kafka/topology/api/ccloud/CCloudApiTest.java +++ b/src/test/java/com/purbon/kafka/topology/api/ccloud/CCloudApiTest.java @@ -14,13 +14,11 @@ import com.purbon.kafka.topology.model.cluster.ServiceAccount; import com.purbon.kafka.topology.roles.TopologyAclBinding; import java.io.IOException; -import java.net.http.HttpResponse; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; - import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -130,70 +128,71 @@ public void testDeleteServiceAccount() throws IOException { @Test public void listServiceAccountsShouldAcceptPage() throws IOException { - String body01 = "{\n" + - " \"api_version\": \"iam/v2\",\n" + - " \"kind\": \"ServiceAccountList\",\n" + - " \"metadata\": {\n" + - " \"first\": \"https://api.confluent.cloud/iam/v2/service-accounts\",\n" + - " \"last\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=bcAOehAY8F16YD84Z1wT\",\n" + - " \"prev\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=YIXRY97wWYmwzrax4dld\",\n" + - " \"next\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=UvmDWOB1iwfAIBPj6EYb\",\n" + - " \"total_size\": 123\n" + - " },\n" + - " \"data\": [\n" + - " {\n" + - " \"api_version\": \"iam/v2\",\n" + - " \"kind\": \"ServiceAccount\",\n" + - " \"id\": \"dlz-f3a90de\",\n" + - " \"metadata\": {\n" + - " \"self\": \"https://api.confluent.cloud/iam/v2/service-accounts/sa-12345\",\n" + - " \"resource_name\": \"crn://confluent.cloud/service-account=sa-12345\",\n" + - " \"created_at\": \"2006-01-02T15:04:05-07:00\",\n" + - " \"updated_at\": \"2006-01-02T15:04:05-07:00\",\n" + - " \"deleted_at\": \"2006-01-02T15:04:05-07:00\"\n" + - " },\n" + - " \"display_name\": \"DeLorean_auto_repair\",\n" + - " \"description\": \"Doc's repair bot for the DeLorean\"\n" + - " }\n" + - " ]\n" + - "}"; + String body01 = + "{\n" + + " \"api_version\": \"iam/v2\",\n" + + " \"kind\": \"ServiceAccountList\",\n" + + " \"metadata\": {\n" + + " \"first\": \"https://api.confluent.cloud/iam/v2/service-accounts\",\n" + + " \"last\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=bcAOehAY8F16YD84Z1wT\",\n" + + " \"prev\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=YIXRY97wWYmwzrax4dld\",\n" + + " \"next\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=UvmDWOB1iwfAIBPj6EYb\",\n" + + " \"total_size\": 123\n" + + " },\n" + + " \"data\": [\n" + + " {\n" + + " \"api_version\": \"iam/v2\",\n" + + " \"kind\": \"ServiceAccount\",\n" + + " \"id\": \"dlz-f3a90de\",\n" + + " \"metadata\": {\n" + + " \"self\": \"https://api.confluent.cloud/iam/v2/service-accounts/sa-12345\",\n" + + " \"resource_name\": \"crn://confluent.cloud/service-account=sa-12345\",\n" + + " \"created_at\": \"2006-01-02T15:04:05-07:00\",\n" + + " \"updated_at\": \"2006-01-02T15:04:05-07:00\",\n" + + " \"deleted_at\": \"2006-01-02T15:04:05-07:00\"\n" + + " },\n" + + " \"display_name\": \"DeLorean_auto_repair\",\n" + + " \"description\": \"Doc's repair bot for the DeLorean\"\n" + + " }\n" + + " ]\n" + + "}"; Response response01 = new Response(null, 200, body01); when(httpClient.doGet(String.format("%s?page_size=%d", V2_IAM_SERVICE_ACCOUNTS_URL, 1))) - .thenReturn(response01); - - String body02 = "{\n" + - " \"api_version\": \"iam/v2\",\n" + - " \"kind\": \"ServiceAccountList\",\n" + - " \"metadata\": {\n" + - " \"first\": \"https://api.confluent.cloud/iam/v2/service-accounts\",\n" + - " \"last\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=bcAOehAY8F16YD84Z1wT\",\n" + - " \"prev\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=YIXRY97wWYmwzrax4dld\",\n" + - " \"total_size\": 123\n" + - " },\n" + - " \"data\": [\n" + - " {\n" + - " \"api_version\": \"iam/v2\",\n" + - " \"kind\": \"ServiceAccount\",\n" + - " \"id\": \"abc-f3a90de\",\n" + - " \"metadata\": {\n" + - " \"self\": \"https://api.confluent.cloud/iam/v2/service-accounts/sa-12345\",\n" + - " \"resource_name\": \"crn://confluent.cloud/service-account=sa-12345\",\n" + - " \"created_at\": \"2006-01-02T15:04:05-07:00\",\n" + - " \"updated_at\": \"2006-01-02T15:04:05-07:00\",\n" + - " \"deleted_at\": \"2006-01-02T15:04:05-07:00\"\n" + - " },\n" + - " \"display_name\": \"MacFly\",\n" + - " \"description\": \"Doc's repair bot for the MacFly\"\n" + - " }\n" + - " ]\n" + - "}"; - Response response02 = new Response(null, 200, body02); + .thenReturn(response01); + String body02 = + "{\n" + + " \"api_version\": \"iam/v2\",\n" + + " \"kind\": \"ServiceAccountList\",\n" + + " \"metadata\": {\n" + + " \"first\": \"https://api.confluent.cloud/iam/v2/service-accounts\",\n" + + " \"last\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=bcAOehAY8F16YD84Z1wT\",\n" + + " \"prev\": \"https://api.confluent.cloud/iam/v2/service-accounts?page_token=YIXRY97wWYmwzrax4dld\",\n" + + " \"total_size\": 123\n" + + " },\n" + + " \"data\": [\n" + + " {\n" + + " \"api_version\": \"iam/v2\",\n" + + " \"kind\": \"ServiceAccount\",\n" + + " \"id\": \"abc-f3a90de\",\n" + + " \"metadata\": {\n" + + " \"self\": \"https://api.confluent.cloud/iam/v2/service-accounts/sa-12345\",\n" + + " \"resource_name\": \"crn://confluent.cloud/service-account=sa-12345\",\n" + + " \"created_at\": \"2006-01-02T15:04:05-07:00\",\n" + + " \"updated_at\": \"2006-01-02T15:04:05-07:00\",\n" + + " \"deleted_at\": \"2006-01-02T15:04:05-07:00\"\n" + + " },\n" + + " \"display_name\": \"MacFly\",\n" + + " \"description\": \"Doc's repair bot for the MacFly\"\n" + + " }\n" + + " ]\n" + + "}"; + Response response02 = new Response(null, 200, body02); when(httpClient.doGet("/iam/v2/service-accounts?page_token=UvmDWOB1iwfAIBPj6EYb")) - .thenReturn(response02); + .thenReturn(response02); Set accounts = apiClient.listServiceAccounts(); assertThat(accounts).hasSize(2); diff --git a/src/test/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutorTest.java b/src/test/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutorTest.java index dda495b21..1f061ee21 100644 --- a/src/test/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutorTest.java +++ b/src/test/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutorTest.java @@ -1,11 +1,11 @@ package com.purbon.kafka.topology.serdes; -import com.purbon.kafka.topology.exceptions.TopologyParsingException; -import org.junit.Test; - import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.purbon.kafka.topology.exceptions.TopologyParsingException; +import org.junit.Test; + public class SystemPropertySubstitutorTest { @Test public void standardReplacement() { @@ -15,15 +15,10 @@ public void standardReplacement() { SystemPropertySubstitutor systemPropertySubstitutor = new SystemPropertySubstitutor(); // When - String result = systemPropertySubstitutor.replace( - "context: ${env}\n" - + "project: ${project}"); + String result = systemPropertySubstitutor.replace("context: ${env}\n" + "project: ${project}"); // Then - assertThat(result).isEqualTo( - "context: staging\n" - + "project: my_project" - ); + assertThat(result).isEqualTo("context: staging\n" + "project: my_project"); } @Test @@ -34,6 +29,6 @@ public void notFoundKeyReplacement() { // When // Then assertThatThrownBy(() -> systemPropertySubstitutor.replace("${not_found_key}")) - .isExactlyInstanceOf(TopologyParsingException.class); + .isExactlyInstanceOf(TopologyParsingException.class); } -} \ No newline at end of file +} From a4b09103d267fb2d15312acbbfe347088eb2afc6 Mon Sep 17 00:00:00 2001 From: Ludovic BOUTROS Date: Wed, 24 Aug 2022 11:26:23 +0200 Subject: [PATCH 3/4] Fix: detailed and optimized schema auth binding function mixed up --- .../kafka/topology/roles/rbac/RBACBindingsBuilder.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java index a1fa4abce..1276cff98 100644 --- a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java @@ -452,13 +452,13 @@ public List setSchemaAuthorization( Boolean shouldOptimizeAcls, String namePrefix) { if (shouldOptimizeAcls) { - return setDetailedSchemaAuthorization(principal, role, namePrefix); + return setOptimizedSchemaAuthorization(principal, role, namePrefix); } else { - return setOptimizedSchemaAuthorization(principal, subjects, role, prefixed); + return setDetailedSchemaAuthorization(principal, subjects, role, prefixed); } } - private List setDetailedSchemaAuthorization( + private List setOptimizedSchemaAuthorization( String principal, String role, String namePrefix) { return List.of( apiClient @@ -467,7 +467,7 @@ private List setDetailedSchemaAuthorization( .apply("SUBJECT", namePrefix, PatternType.PREFIXED.name())); } - private List setOptimizedSchemaAuthorization( + private List setDetailedSchemaAuthorization( String principal, List subjects, String role, boolean prefixed) { String patternType = prefixed ? PatternType.PREFIXED.name() : PatternType.LITERAL.name(); From ba0e58bd5b6ab5d12da2853b126e64b920fcb883 Mon Sep 17 00:00:00 2001 From: Ludovic BOUTROS Date: Wed, 13 Sep 2023 20:26:48 +0200 Subject: [PATCH 4/4] Fix: Use Apache Commons Text for string substitution --- pom.xml | 2 +- .../serdes/SystemPropertySubstitutor.java | 56 ++++--------------- 2 files changed, 11 insertions(+), 47 deletions(-) diff --git a/pom.xml b/pom.xml index 44f923227..2e461c08b 100644 --- a/pom.xml +++ b/pom.xml @@ -531,7 +531,7 @@ 2.17.1 3.5.7 1.4 - 1.9 + 1.10.0 3.6.0 4.13.1 1.15.3 diff --git a/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java b/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java index 33c07722d..94e5dbb80 100644 --- a/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java +++ b/src/main/java/com/purbon/kafka/topology/serdes/SystemPropertySubstitutor.java @@ -1,61 +1,25 @@ package com.purbon.kafka.topology.serdes; import com.purbon.kafka.topology.exceptions.TopologyParsingException; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.logging.log4j.core.LogEvent; -import org.apache.logging.log4j.core.lookup.StrSubstitutor; -import org.apache.logging.log4j.status.StatusLogger; + +import org.apache.commons.text.StringSubstitutor; +import org.apache.commons.text.lookup.StringLookupFactory; /** * A simple substitutor which substitutes system properties in the topology file. If a property is * not found, it throws a {@link com.purbon.kafka.topology.exceptions.TopologyParsingException}. */ public class SystemPropertySubstitutor { - private final Map env; - private final StrSubstitutor strSubstitutor; - public SystemPropertySubstitutor() { - env = - System.getProperties().entrySet().stream() - .collect(Collectors.toMap(e -> (String) e.getKey(), e -> (String) e.getValue())); - - strSubstitutor = - new StrSubstitutor(env, "${", "}") { - @Override - protected String resolveVariable( - final LogEvent event, - final String variableName, - final StringBuilder buf, - final int startPos, - final int endPos) { - String result = super.resolveVariable(event, variableName, buf, startPos, endPos); - if (result == null) { - throw new TopologyParsingException("Cannot resolve variable: " + variableName); - } - return result; - } - - @Override - public String replace(final LogEvent event, final String source) { - if (source == null) { - return null; - } - final StringBuilder buf = new StringBuilder(source); - try { - if (!substitute(event, buf, 0, source.length())) { - return source; - } - } catch (Throwable t) { - StatusLogger.getLogger().error("Replacement failed on {}", source, t); - throw t; - } - return buf.toString(); - } - }; } public String replace(String original) { - return strSubstitutor.replace(original); + try { + return new StringSubstitutor(StringLookupFactory.INSTANCE.systemPropertyStringLookup()) + .setEnableUndefinedVariableException(true) + .replace(original); + } catch (IllegalArgumentException ex) { + throw new TopologyParsingException("A variable was not resolved: ", ex); + } } }