From 341e535942c378c72a0b1d7aa4eec4895ef15b17 Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Wed, 22 Jan 2025 23:12:06 +0800 Subject: [PATCH] KAFKA-18519: Remove Json.scala, cleanup AclEntry.scala (#18614) Reviewers: Mickael Maison , Ismael Juma , Chia-Ping Tsai --- core/src/main/scala/kafka/utils/Json.scala | 92 -------- .../security/authorizer/AclEntryTest.scala | 48 ---- .../scala/unit/kafka/utils/JsonTest.scala | 134 ----------- .../unit/kafka/utils/json/JsonValueTest.scala | 212 ------------------ gradle/spotbugs-exclude.xml | 7 - .../kafka/jmh/acl/AuthorizerBenchmark.java | 39 ++-- .../kafka/security/authorizer/AclEntry.java | 122 +--------- 7 files changed, 17 insertions(+), 637 deletions(-) delete mode 100644 core/src/main/scala/kafka/utils/Json.scala delete mode 100644 core/src/test/scala/unit/kafka/security/authorizer/AclEntryTest.scala delete mode 100644 core/src/test/scala/unit/kafka/utils/JsonTest.scala delete mode 100644 core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala deleted file mode 100644 index 049941cd01d98..0000000000000 --- a/core/src/main/scala/kafka/utils/Json.scala +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.utils - -import com.fasterxml.jackson.core.{JsonParseException, JsonProcessingException} -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.databind.node.MissingNode -import kafka.utils.json.JsonValue - -import scala.reflect.ClassTag - -/** - * Provides methods for parsing JSON with Jackson and encoding to JSON with a simple and naive custom implementation. - */ -object Json { - - private val mapper = new ObjectMapper() - - /** - * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. - */ - def parseFull(input: String): Option[JsonValue] = tryParseFull(input).toOption - - /** - * Parse a JSON string into either a generic type T, or a JsonProcessingException in the case of - * exception. - */ - def parseStringAs[T](input: String)(implicit tag: ClassTag[T]): Either[JsonProcessingException, T] = { - try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T]) - catch { case e: JsonProcessingException => Left(e) } - } - - /** - * Parse a JSON byte array into a JsonValue if possible. `None` is returned if `input` is not valid JSON. - */ - def parseBytes(input: Array[Byte]): Option[JsonValue] = - try Option(mapper.readTree(input)).map(JsonValue(_)) - catch { case _: JsonProcessingException => None } - - def tryParseBytes(input: Array[Byte]): Either[JsonProcessingException, JsonValue] = - try Right(mapper.readTree(input)).map(JsonValue(_)) - catch { case e: JsonProcessingException => Left(e) } - - /** - * Parse a JSON byte array into either a generic type T, or a JsonProcessingException in the case of exception. - */ - def parseBytesAs[T](input: Array[Byte])(implicit tag: ClassTag[T]): Either[JsonProcessingException, T] = { - try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T]) - catch { case e: JsonProcessingException => Left(e) } - } - - /** - * Parse a JSON string into a JsonValue if possible. It returns an `Either` where `Left` will be an exception and - * `Right` is the `JsonValue`. - * @param input a JSON string to parse - * @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value. - */ - def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] = - if (input == null || input.isEmpty) - Left(new JsonParseException(MissingNode.getInstance().traverse(), "The input string shouldn't be empty")) - else - try Right(mapper.readTree(input)).map(JsonValue(_)) - catch { case e: JsonProcessingException => Left(e) } - - /** - * Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in - * the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid - * a jackson-scala dependency). - */ - def encodeAsString(obj: Any): String = mapper.writeValueAsString(obj) - - /** - * Encode an object into a JSON value in bytes. This method accepts any type supported by Jackson's ObjectMapper in - * the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid - * a jackson-scala dependency). - */ - def encodeAsBytes(obj: Any): Array[Byte] = mapper.writeValueAsBytes(obj) -} diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclEntryTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclEntryTest.scala deleted file mode 100644 index f6867b6233b89..0000000000000 --- a/core/src/test/scala/unit/kafka/security/authorizer/AclEntryTest.scala +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.security.authorizer - -import java.nio.charset.StandardCharsets.UTF_8 -import kafka.utils.Json -import org.apache.kafka.common.acl.AccessControlEntry -import org.apache.kafka.common.acl.AclOperation.READ -import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} -import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.security.authorizer.AclEntry -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test - -import java.util - -class AclEntryTest { - - val AclJson = """{"version": 1, "acls": [{"host": "host1","permissionType": "Deny","operation": "READ", "principal": "User:alice" }, - { "host": "*" , "permissionType": "Allow", "operation": "Read", "principal": "User:bob" }, - { "host": "host1", "permissionType": "Deny", "operation": "Read" , "principal": "User:bob"}]}""" - - @Test - def testAclJsonConversion(): Unit = { - val acl1 = new AclEntry(new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice").toString, "host1", READ, DENY)) - val acl2 = new AclEntry(new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob").toString, "*", READ, ALLOW)) - val acl3 = new AclEntry(new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob").toString, "host1", READ, DENY)) - - val acls = new util.HashSet[AclEntry](util.Arrays.asList(acl1, acl2, acl3)) - - assertEquals(acls, AclEntry.fromBytes(Json.encodeAsBytes(AclEntry.toJsonCompatibleMap(acls)))) - assertEquals(acls, AclEntry.fromBytes(AclJson.getBytes(UTF_8))) - } -} diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala deleted file mode 100644 index aca7d45600d12..0000000000000 --- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.utils - -import java.nio.charset.StandardCharsets - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.core.{JsonParseException, JsonProcessingException} -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.node._ -import kafka.utils.JsonTest.TestObject -import kafka.utils.json.JsonValue -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test - -import scala.jdk.CollectionConverters._ -import scala.collection.Map - -object JsonTest { - case class TestObject(@JsonProperty("foo") foo: String, @JsonProperty("bar") bar: Int) -} - -class JsonTest { - - @Test - def testJsonParse(): Unit = { - val jnf = JsonNodeFactory.instance - - assertEquals(Some(JsonValue(new ObjectNode(jnf))), Json.parseFull("{}")) - assertEquals(Right(JsonValue(new ObjectNode(jnf))), Json.tryParseFull("{}")) - assertEquals(classOf[Left[JsonProcessingException, JsonValue]], Json.tryParseFull(null).getClass) - assertThrows(classOf[IllegalArgumentException], () => Json.tryParseBytes(null)) - - assertEquals(None, Json.parseFull("")) - assertEquals(classOf[Left[JsonProcessingException, JsonValue]], Json.tryParseFull("").getClass) - - assertEquals(None, Json.parseFull("""{"foo":"bar"s}""")) - val tryRes = Json.tryParseFull("""{"foo":"bar"s}""") - assertTrue(tryRes.isInstanceOf[Left[_, JsonValue]]) - - val objectNode = new ObjectNode( - jnf, - Map[String, JsonNode]("foo" -> new TextNode("bar"), "is_enabled" -> BooleanNode.TRUE).asJava - ) - assertEquals(Some(JsonValue(objectNode)), Json.parseFull("""{"foo":"bar", "is_enabled":true}""")) - assertEquals(Right(JsonValue(objectNode)), Json.tryParseFull("""{"foo":"bar", "is_enabled":true}""")) - - val arrayNode = new ArrayNode(jnf) - Vector(1, 2, 3).map(new IntNode(_)).foreach(arrayNode.add) - assertEquals(Some(JsonValue(arrayNode)), Json.parseFull("[1, 2, 3]")) - - // Test with encoder that properly escapes backslash and quotes - val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""").asJava - val encoded = Json.encodeAsString(map) - val decoded = Json.parseFull(encoded) - assertEquals(decoded, Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}""")) - } - - @Test - def testEncodeAsString(): Unit = { - assertEquals("null", Json.encodeAsString(null)) - assertEquals("1", Json.encodeAsString(1)) - assertEquals("1", Json.encodeAsString(1L)) - assertEquals("1", Json.encodeAsString(1.toByte)) - assertEquals("1", Json.encodeAsString(1.toShort)) - assertEquals("1.0", Json.encodeAsString(1.0)) - assertEquals(""""str"""", Json.encodeAsString("str")) - assertEquals("true", Json.encodeAsString(true)) - assertEquals("false", Json.encodeAsString(false)) - assertEquals("[]", Json.encodeAsString(Seq().asJava)) - assertEquals("[null]", Json.encodeAsString(Seq(null).asJava)) - assertEquals("[1,2,3]", Json.encodeAsString(Seq(1,2,3).asJava)) - assertEquals("""[1,"2",[3],null]""", Json.encodeAsString(Seq(1,"2",Seq(3).asJava,null).asJava)) - assertEquals("{}", Json.encodeAsString(Map().asJava)) - assertEquals("""{"a":1,"b":2,"c":null}""", Json.encodeAsString(Map("a" -> 1, "b" -> 2, "c" -> null).asJava)) - assertEquals("""{"a":[1,2],"c":[3,4]}""", Json.encodeAsString(Map("a" -> Seq(1,2).asJava, "c" -> Seq(3,4).asJava).asJava)) - assertEquals("""{"a":[1,2],"b":[3,4],"c":null}""", Json.encodeAsString(Map("a" -> Seq(1,2).asJava, "b" -> Seq(3,4).asJava, "c" -> null).asJava)) - assertEquals(""""str1\\,str2"""", Json.encodeAsString("""str1\,str2""")) - assertEquals(""""\"quoted\""""", Json.encodeAsString(""""quoted"""")) - } - - @Test - def testEncodeAsBytes(): Unit = { - assertEquals("null", new String(Json.encodeAsBytes(null), StandardCharsets.UTF_8)) - assertEquals("1", new String(Json.encodeAsBytes(1), StandardCharsets.UTF_8)) - assertEquals("1", new String(Json.encodeAsBytes(1L), StandardCharsets.UTF_8)) - assertEquals("1", new String(Json.encodeAsBytes(1.toByte), StandardCharsets.UTF_8)) - assertEquals("1", new String(Json.encodeAsBytes(1.toShort), StandardCharsets.UTF_8)) - assertEquals("1.0", new String(Json.encodeAsBytes(1.0), StandardCharsets.UTF_8)) - assertEquals(""""str"""", new String(Json.encodeAsBytes("str"), StandardCharsets.UTF_8)) - assertEquals("true", new String(Json.encodeAsBytes(true), StandardCharsets.UTF_8)) - assertEquals("false", new String(Json.encodeAsBytes(false), StandardCharsets.UTF_8)) - assertEquals("[]", new String(Json.encodeAsBytes(Seq().asJava), StandardCharsets.UTF_8)) - assertEquals("[null]", new String(Json.encodeAsBytes(Seq(null).asJava), StandardCharsets.UTF_8)) - assertEquals("[1,2,3]", new String(Json.encodeAsBytes(Seq(1,2,3).asJava), StandardCharsets.UTF_8)) - assertEquals("""[1,"2",[3],null]""", new String(Json.encodeAsBytes(Seq(1,"2",Seq(3).asJava,null).asJava), StandardCharsets.UTF_8)) - assertEquals("{}", new String(Json.encodeAsBytes(Map().asJava), StandardCharsets.UTF_8)) - assertEquals("""{"a":1,"b":2,"c":null}""", new String(Json.encodeAsBytes(Map("a" -> 1, "b" -> 2, "c" -> null).asJava), StandardCharsets.UTF_8)) - assertEquals("""{"a":[1,2],"c":[3,4]}""", new String(Json.encodeAsBytes(Map("a" -> Seq(1,2).asJava, "c" -> Seq(3,4).asJava).asJava), StandardCharsets.UTF_8)) - assertEquals("""{"a":[1,2],"b":[3,4],"c":null}""", new String(Json.encodeAsBytes(Map("a" -> Seq(1,2).asJava, "b" -> Seq(3,4).asJava, "c" -> null).asJava), StandardCharsets.UTF_8)) - assertEquals(""""str1\\,str2"""", new String(Json.encodeAsBytes("""str1\,str2"""), StandardCharsets.UTF_8)) - assertEquals(""""\"quoted\""""", new String(Json.encodeAsBytes(""""quoted""""), StandardCharsets.UTF_8)) - } - - @Test - def testParseTo(): Unit = { - val foo = "baz" - val bar = 1 - - val result = Json.parseStringAs[TestObject](s"""{"foo": "$foo", "bar": $bar}""") - - assertEquals(Right(TestObject(foo, bar)), result) - } - - @Test - def testParseToWithInvalidJson(): Unit = { - val result = Json.parseStringAs[TestObject]("{invalid json}") - assertEquals(Left(classOf[JsonParseException]), result.left.map(_.getClass)) - } -} diff --git a/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala b/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala deleted file mode 100644 index 8194b298b4e5a..0000000000000 --- a/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils.json - -import scala.collection.Seq - -import com.fasterxml.jackson.databind.{ObjectMapper, JsonMappingException} -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.Assertions._ - -import kafka.utils.Json - -class JsonValueTest { - - private val json = """ - |{ - | "boolean": false, - | "int": 1234, - | "long": 3000000000, - | "double": 16.244355, - | "string": "string", - | "number_as_string": "123", - | "array": [4.0, 11.1, 44.5], - | "object": { - | "a": true, - | "b": false - | }, - | "null": null - |} - """.stripMargin - - private def parse(s: String): JsonValue = - Json.parseFull(s).getOrElse(sys.error("Failed to parse json: " + s)) - - private def assertTo[T: DecodeJson](expected: T, jsonValue: JsonObject => JsonValue): Unit = { - val parsed = jsonValue(parse(json).asJsonObject) - assertEquals(Right(expected), parsed.toEither[T]) - assertEquals(expected, parsed.to[T]) - } - - private def assertToFails[T: DecodeJson](jsonValue: JsonObject => JsonValue): Unit = { - val parsed = jsonValue(parse(json).asJsonObject) - assertTrue(parsed.toEither[T].isLeft) - assertThrow[JsonMappingException](parsed.to[T]) - } - - def assertThrow[E <: Throwable : Manifest](body: => Unit): Unit = { - import scala.util.control.Exception._ - val klass = manifest[E].runtimeClass - catchingPromiscuously(klass).opt(body).foreach { _ => - fail("Expected `" + klass + "` to be thrown, but no exception was thrown") - } - } - - @Test - def testAsJsonObject(): Unit = { - val parsed = parse(json).asJsonObject - val obj = parsed("object") - assertEquals(obj, obj.asJsonObject) - assertThrow[JsonMappingException](parsed("array").asJsonObject) - } - - @Test - def testAsJsonObjectOption(): Unit = { - val parsed = parse(json).asJsonObject - assertTrue(parsed("object").asJsonObjectOption.isDefined) - assertEquals(None, parsed("array").asJsonObjectOption) - } - - @Test - def testAsJsonArray(): Unit = { - val parsed = parse(json).asJsonObject - val array = parsed("array") - assertEquals(array, array.asJsonArray) - assertThrow[JsonMappingException](parsed("object").asJsonArray) - } - - @Test - def testAsJsonArrayOption(): Unit = { - val parsed = parse(json).asJsonObject - assertTrue(parsed("array").asJsonArrayOption.isDefined) - assertEquals(None, parsed("object").asJsonArrayOption) - } - - @Test - def testJsonObjectGet(): Unit = { - val parsed = parse(json).asJsonObject - assertEquals(Some(parse("""{"a":true,"b":false}""")), parsed.get("object")) - assertEquals(None, parsed.get("aaaaa")) - } - - @Test - def testJsonObjectApply(): Unit = { - val parsed = parse(json).asJsonObject - assertEquals(parse("""{"a":true,"b":false}"""), parsed("object")) - assertThrow[JsonMappingException](parsed("aaaaaaaa")) - } - - @Test - def testJsonObjectIterator(): Unit = { - assertEquals( - Vector("a" -> parse("true"), "b" -> parse("false")), - parse(json).asJsonObject("object").asJsonObject.iterator.toVector - ) - } - - @Test - def testJsonArrayIterator(): Unit = { - assertEquals(Vector("4.0", "11.1", "44.5").map(parse), parse(json).asJsonObject("array").asJsonArray.iterator.toVector) - } - - @Test - def testJsonValueEquals(): Unit = { - - assertEquals(parse(json), parse(json)) - - assertEquals(parse("""{"blue": true, "red": false}"""), parse("""{"red": false, "blue": true}""")) - assertNotEquals(parse("""{"blue": true, "red": true}"""), parse("""{"red": false, "blue": true}""")) - - assertEquals(parse("""[1, 2, 3]"""), parse("""[1, 2, 3]""")) - assertNotEquals(parse("""[1, 2, 3]"""), parse("""[2, 1, 3]""")) - - assertEquals(parse("1344"), parse("1344")) - assertNotEquals(parse("1344"), parse("144")) - - } - - @Test - def testJsonValueHashCode(): Unit = { - assertEquals(new ObjectMapper().readTree(json).hashCode, parse(json).hashCode) - } - - @Test - def testJsonValueToString(): Unit = { - val js = """{"boolean":false,"int":1234,"array":[4.0,11.1,44.5],"object":{"a":true,"b":false}}""" - assertEquals(js, parse(js).toString) - } - - @Test - def testDecodeBoolean(): Unit = { - assertTo[Boolean](false, _("boolean")) - assertToFails[Boolean](_("int")) - } - - @Test - def testDecodeString(): Unit = { - assertTo[String]("string", _("string")) - assertTo[String]("123", _("number_as_string")) - assertToFails[String](_("int")) - assertToFails[String](_("array")) - } - - @Test - def testDecodeInt(): Unit = { - assertTo[Int](1234, _("int")) - assertToFails[Int](_("long")) - } - - @Test - def testDecodeLong(): Unit = { - assertTo[Long](3000000000L, _("long")) - assertTo[Long](1234, _("int")) - assertToFails[Long](_("string")) - } - - @Test - def testDecodeDouble(): Unit = { - assertTo[Double](16.244355, _("double")) - assertTo[Double](1234.0, _("int")) - assertTo[Double](3000000000L, _("long")) - assertToFails[Double](_("string")) - } - - @Test - def testDecodeSeq(): Unit = { - assertTo[Seq[Double]](Seq(4.0, 11.1, 44.5), _("array")) - assertToFails[Seq[Double]](_("string")) - assertToFails[Seq[Double]](_("object")) - assertToFails[Seq[String]](_("array")) - } - - @Test - def testDecodeMap(): Unit = { - assertTo[Map[String, Boolean]](Map("a" -> true, "b" -> false), _("object")) - assertToFails[Map[String, Int]](_("object")) - assertToFails[Map[String, String]](_("object")) - assertToFails[Map[String, Double]](_("array")) - } - - @Test - def testDecodeOption(): Unit = { - assertTo[Option[Int]](None, _("null")) - assertTo[Option[Int]](Some(1234), _("int")) - assertToFails[Option[String]](_("int")) - } - -} diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index ddd0c398b5bac..3745dd8e4f848 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -151,13 +151,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - - - - - - - diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java index 051d4d7724fec..d2fb9018df219 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java @@ -35,7 +35,6 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.metadata.authorizer.StandardAcl; import org.apache.kafka.metadata.authorizer.StandardAuthorizer; -import org.apache.kafka.security.authorizer.AclEntry; import org.apache.kafka.server.authorizer.Action; import org.openjdk.jmh.annotations.Benchmark; @@ -113,14 +112,14 @@ public void setup() throws Exception { } private void prepareAclCache() { - Map> aclEntries = new HashMap<>(); + Map> aclEntries = new HashMap<>(); for (int resourceId = 0; resourceId < resourceCount; resourceId++) { ResourcePattern resource = new ResourcePattern( (resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC, resourceNamePrefix + resourceId, (resourceId % 5 == 0) ? PatternType.PREFIXED : PatternType.LITERAL); - Set entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>()); + Set entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>()); for (int aclId = 0; aclId < aclCount; aclId++) { // The principal in the request context we are using @@ -129,36 +128,31 @@ private void prepareAclCache() { AccessControlEntry allowAce = new AccessControlEntry( principalName, "*", AclOperation.READ, AclPermissionType.ALLOW); - entries.add(new AclEntry(allowAce)); + entries.add(new AccessControlEntry(allowAce.principal(), allowAce.host(), allowAce.operation(), allowAce.permissionType())); if (shouldDeny()) { - // dominantly deny the resource - AccessControlEntry denyAce = new AccessControlEntry( - principalName, "*", AclOperation.READ, AclPermissionType.DENY); - entries.add(new AclEntry(denyAce)); + entries.add(new AccessControlEntry(principalName, "*", AclOperation.READ, AclPermissionType.DENY)); } } } ResourcePattern resourcePrefix = new ResourcePattern(ResourceType.TOPIC, resourceNamePrefix, PatternType.PREFIXED); - Set entriesPrefix = aclEntries.computeIfAbsent(resourcePrefix, k -> new HashSet<>()); + Set entriesPrefix = aclEntries.computeIfAbsent(resourcePrefix, k -> new HashSet<>()); for (int hostId = 0; hostId < hostPreCount; hostId++) { AccessControlEntry allowAce = new AccessControlEntry(principal.toString(), "127.0.0." + hostId, AclOperation.READ, AclPermissionType.ALLOW); - entriesPrefix.add(new AclEntry(allowAce)); + entriesPrefix.add(new AccessControlEntry(allowAce.principal(), allowAce.host(), allowAce.operation(), allowAce.permissionType())); if (shouldDeny()) { - // dominantly deny the resource - AccessControlEntry denyAce = new AccessControlEntry(principal.toString(), "127.0.0." + hostId, - AclOperation.READ, AclPermissionType.DENY); - entriesPrefix.add(new AclEntry(denyAce)); + entriesPrefix.add(new AccessControlEntry(principal.toString(), "127.0.0." + hostId, + AclOperation.READ, AclPermissionType.DENY)); } } ResourcePattern resourceWildcard = new ResourcePattern(ResourceType.TOPIC, ResourcePattern.WILDCARD_RESOURCE, PatternType.LITERAL); - Set entriesWildcard = aclEntries.computeIfAbsent(resourceWildcard, k -> new HashSet<>()); + Set entriesWildcard = aclEntries.computeIfAbsent(resourceWildcard, k -> new HashSet<>()); // get dynamic entries number for wildcard acl for (int hostId = 0; hostId < resourceCount / 10; hostId++) { String hostName = "127.0.0" + hostId; @@ -170,23 +164,22 @@ private void prepareAclCache() { AccessControlEntry allowAce = new AccessControlEntry(principal.toString(), hostName, AclOperation.READ, AclPermissionType.ALLOW); - entriesWildcard.add(new AclEntry(allowAce)); + entriesWildcard.add(new AccessControlEntry(allowAce.principal(), allowAce.host(), allowAce.operation(), allowAce.permissionType())); if (shouldDeny()) { - AccessControlEntry denyAce = new AccessControlEntry(principal.toString(), hostName, - AclOperation.READ, AclPermissionType.DENY); - entriesWildcard.add(new AclEntry(denyAce)); + entriesWildcard.add(new AccessControlEntry(principal.toString(), hostName, + AclOperation.READ, AclPermissionType.DENY)); } } setupAcls(aclEntries); } - private void setupAcls(Map> aclEntries) { - for (Map.Entry> entryMap : aclEntries.entrySet()) { + private void setupAcls(Map> aclEntries) { + for (Map.Entry> entryMap : aclEntries.entrySet()) { ResourcePattern resourcePattern = entryMap.getKey(); - for (AclEntry aclEntry : entryMap.getValue()) { - StandardAcl standardAcl = StandardAcl.fromAclBinding(new AclBinding(resourcePattern, aclEntry)); + for (AccessControlEntry accessControlEntry : entryMap.getValue()) { + StandardAcl standardAcl = StandardAcl.fromAclBinding(new AclBinding(resourcePattern, accessControlEntry)); authorizer.addAcl(Uuid.randomUuid(), standardAcl); } authorizer.completeInitialLoad(); diff --git a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java index 012be7f55a95d..4b1b12aefdcff 100644 --- a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java +++ b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java @@ -16,28 +16,15 @@ */ package org.apache.kafka.security.authorizer; -import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclOperation; -import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; -import org.apache.kafka.common.utils.SecurityUtils; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.server.util.Json; -import org.apache.kafka.server.util.json.DecodeJson; -import org.apache.kafka.server.util.json.JsonObject; -import org.apache.kafka.server.util.json.JsonValue; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -54,99 +41,16 @@ import static org.apache.kafka.common.acl.AclOperation.READ; import static org.apache.kafka.common.acl.AclOperation.WRITE; -public class AclEntry extends AccessControlEntry { - private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger(); - private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString(); +public class AclEntry { public static final KafkaPrincipal WILDCARD_PRINCIPAL = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"); public static final String WILDCARD_PRINCIPAL_STRING = WILDCARD_PRINCIPAL.toString(); public static final String WILDCARD_HOST = "*"; public static final String WILDCARD_RESOURCE = ResourcePattern.WILDCARD_RESOURCE; - public static final String RESOURCE_SEPARATOR = ":"; - public static final Set RESOURCE_TYPES = Arrays.stream(ResourceType.values()) - .filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY)) - .collect(Collectors.toSet()); public static final Set ACL_OPERATIONS = Arrays.stream(AclOperation.values()) .filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY)) .collect(Collectors.toSet()); - private static final String PRINCIPAL_KEY = "principal"; - private static final String PERMISSION_TYPE_KEY = "permissionType"; - private static final String OPERATION_KEY = "operation"; - private static final String HOSTS_KEY = "host"; - public static final String VERSION_KEY = "version"; - public static final int CURRENT_VERSION = 1; - private static final String ACLS_KEY = "acls"; - - public final AccessControlEntry ace; - public final KafkaPrincipal kafkaPrincipal; - - public AclEntry(AccessControlEntry ace) { - super(ace.principal(), ace.host(), ace.operation(), ace.permissionType()); - this.ace = ace; - - kafkaPrincipal = ace.principal() == null - ? null - : SecurityUtils.parseKafkaPrincipal(ace.principal()); - } - - /** - * Parse JSON representation of ACLs - * @param bytes of acls json string - * - *

- { - "version": 1, - "acls": [ - { - "host":"host1", - "permissionType": "Deny", - "operation": "Read", - "principal": "User:alice" - } - ] - } - *

- * - * @return set of AclEntry objects from the JSON string - */ - public static Set fromBytes(byte[] bytes) throws IOException { - if (bytes == null || bytes.length == 0) - return Collections.emptySet(); - - Optional jsonValue = Json.parseBytes(bytes); - if (jsonValue.isEmpty()) - return Collections.emptySet(); - - JsonObject js = jsonValue.get().asJsonObject(); - - //the acl json version. - Utils.require(js.apply(VERSION_KEY).to(INT) == CURRENT_VERSION); - - Set res = new HashSet<>(); - - Iterator aclsIter = js.apply(ACLS_KEY).asJsonArray().iterator(); - while (aclsIter.hasNext()) { - JsonObject itemJs = aclsIter.next().asJsonObject(); - KafkaPrincipal principal = SecurityUtils.parseKafkaPrincipal(itemJs.apply(PRINCIPAL_KEY).to(STRING)); - AclPermissionType permissionType = SecurityUtils.permissionType(itemJs.apply(PERMISSION_TYPE_KEY).to(STRING)); - String host = itemJs.apply(HOSTS_KEY).to(STRING); - AclOperation operation = SecurityUtils.operation(itemJs.apply(OPERATION_KEY).to(STRING)); - - res.add(new AclEntry(new AccessControlEntry(principal.toString(), - host, operation, permissionType))); - } - - return res; - } - - public static Map toJsonCompatibleMap(Set acls) { - Map res = new HashMap<>(); - res.put(AclEntry.VERSION_KEY, AclEntry.CURRENT_VERSION); - res.put(AclEntry.ACLS_KEY, acls.stream().map(AclEntry::toMap).collect(Collectors.toList())); - return res; - } - public static Set supportedOperations(ResourceType resourceType) { switch (resourceType) { case TOPIC: @@ -182,28 +86,4 @@ public static Errors authorizationError(ResourceType resourceType) { throw new IllegalArgumentException("Authorization error type not known"); } } - - public Map toMap() { - Map res = new HashMap<>(); - res.put(AclEntry.PRINCIPAL_KEY, principal()); - res.put(AclEntry.PERMISSION_TYPE_KEY, SecurityUtils.permissionTypeName(permissionType())); - res.put(AclEntry.OPERATION_KEY, SecurityUtils.operationName(operation())); - res.put(AclEntry.HOSTS_KEY, host()); - return res; - } - - @Override - public int hashCode() { - return ace.hashCode(); - } - - @Override - public boolean equals(Object o) { - return super.equals(o); // to keep spotbugs happy - } - - @Override - public String toString() { - return String.format("%s has %s permission for operations: %s from hosts: %s", principal(), permissionType().name(), operation(), host()); - } }