Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make all AdminClient timeouts configurable #498

Merged
merged 7 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,22 +323,33 @@ ns4kafka:
The name for each managed cluster has to be unique. This is this name you have to set in the field **metadata.cluster**
of your namespace descriptors.

| Property | type | description |
|-----------------------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------|
| manage-users | boolean | Does the cluster manages users ? |
| manage-acls | boolean | Does the cluster manages access control entries ? |
| manage-topics | boolean | Does the cluster manages topics ? |
| manage-connectors | boolean | Does the cluster manages connects ? |
| drop-unsync-acls | boolean | Should Ns4Kafka drop unsynchronized ACLs |
| provider | boolean | The kind of cluster. Either SELF_MANAGED or CONFLUENT_CLOUD |
| config.bootstrap.servers | string | The location of the clusters servers |
| config.cluster.id | string | The cluster id. Required to use [Confluent Cloud tags](https://docs.confluent.io/cloud/current/stream-governance/stream-catalog.html). In this case, [Stream Catalog properties](#stream-catalog) must be set. |
| schema-registry.url | string | The location of the Schema Registry |
| schema-registry.basicAuthUsername | string | Basic authentication username to the Schema Registry |
| schema-registry.basicAuthPassword | string | Basic authentication password to the Schema Registry |
| connects.connect-name.url | string | The location of the kafka connect |
| connects.connect-name.basicAuthUsername | string | Basic authentication username to the Kafka Connect |
| connects.connect-name.basicAuthPassword | string | Basic authentication password to the Kafka Connect |
| Property | Type | Required | Description |
|--------------------------------------|---------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| manage-acls | boolean | No | Does the cluster manages access control entries (Default: false) |
| manage-connectors | boolean | No | Does the cluster manages connects (Default: false) |
| manage-topics | boolean | No | Does the cluster manages topics (Default: false) |
| manage-users | boolean | No | Does the cluster manages users (Default: false) |
| drop-unsync-acls | boolean | No | Should unsynchronized acls be dropped (Default: true) |
| timeout.acl.create | int | No | The timeout in milliseconds used by the AdminClient to create acls (Default: 30000ms) |
| timeout.acl.describe | int | No | The timeout in milliseconds used by the AdminClient to describe acls (Default: 30000ms) |
| timeout.acl.delete | int | No | The timeout in milliseconds used by the AdminClient to delete acls (Default: 30000ms) |
| timeout.topic.alter-configs | int | No | The timeout in milliseconds used by the AdminClient to alter topic configs (Default: 30000ms) |
| timeout.topic.create | int | No | The timeout in milliseconds used by the AdminClient to create topics (Default: 30000ms) |
| timeout.topic.describe-configs | int | No | The timeout in milliseconds used by the AdminClient to describe topic configs (Default: 30000ms) |
| timeout.topic.delete | int | No | The timeout in milliseconds used by the AdminClient to delete topics (Default: 30000ms) |
| timeout.topic.list | int | No | The timeout in milliseconds used by the AdminClient to list topics (Default: 30000ms) |
| timeout.user.alter-quotas | int | No | The timeout in milliseconds used by the AdminClient to alter client quotas (Default: 30000ms) |
| timeout.user.alter-scram-credentials | int | No | The timeout in milliseconds used by the AdminClient to alter scram credentials (Default: 30000ms) |
| timeout.user.describe-quotas | int | No | The timeout in milliseconds used by the AdminClient to describe client quotas (Default: 30000ms) |
| provider | boolean | Yes | The kind of cluster. Either SELF_MANAGED or CONFLUENT_CLOUD |
| config.bootstrap.servers | string | Yes | The location of the clusters servers |
| config.cluster.id | string | No | The cluster id. Required to use [Confluent Cloud tags](https://docs.confluent.io/cloud/current/stream-governance/stream-catalog.html). In this case, [Stream Catalog properties](#stream-catalog) must be set. |
| schema-registry.url | string | No | The location of the Schema Registry |
| schema-registry.basicAuthUsername | string | No | Basic authentication username to the Schema Registry |
| schema-registry.basicAuthPassword | string | No | Basic authentication password to the Schema Registry |
| connects.<name>.url | string | No | The location of the kafka connect |
| connects.<name>.basicAuthUsername | string | No | Basic authentication username to the Kafka Connect |
| connects.<name>.basicAuthPassword | string | No | Basic authentication password to the Kafka Connect |

The configuration will depend on the authentication method selected for your broker, schema registry and Kafka Connect.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
@EachProperty("ns4kafka.managed-clusters")
public class ManagedClusterProperties {
private String name;
private boolean manageTopics;
private boolean manageAcls;
private boolean dropUnsyncAcls = true;
private boolean manageUsers;
private boolean manageConnectors;
private boolean manageTopics;
private boolean manageUsers;
private boolean dropUnsyncAcls = true;
private TimeoutProperties timeout = new TimeoutProperties();
private KafkaProvider provider;
private Properties config;
private Map<String, ConnectProperties> connects;
Expand Down Expand Up @@ -66,9 +67,9 @@ public enum KafkaProvider {
@Setter
@Introspected
public static class ConnectProperties {
String url;
String basicAuthUsername;
String basicAuthPassword;
private String url;
private String basicAuthUsername;
private String basicAuthPassword;
}

/**
Expand All @@ -78,9 +79,60 @@ public static class ConnectProperties {
@Setter
@ConfigurationProperties("schema-registry")
public static class SchemaRegistryProperties {
String url;
String basicAuthUsername;
String basicAuthPassword;
private String url;
private String basicAuthUsername;
private String basicAuthPassword;
}

/**
* Timeout properties.
*/
@Getter
@Setter
@ConfigurationProperties("timeout")
public static class TimeoutProperties {
private static final int DEFAULT_TIMEOUT_MS = 30000;
private AclProperties acl = new AclProperties();
private TopicProperties topic = new TopicProperties();
private UserProperties user = new UserProperties();

/**
* ACL properties.
*/
@Getter
@Setter
@ConfigurationProperties("acl")
public static class AclProperties {
private int describe = DEFAULT_TIMEOUT_MS;
private int create = DEFAULT_TIMEOUT_MS;
private int delete = DEFAULT_TIMEOUT_MS;
}

/**
* Topic properties.
*/
@Getter
@Setter
@ConfigurationProperties("topic")
public static class TopicProperties {
private int alterConfigs = DEFAULT_TIMEOUT_MS;
private int create = DEFAULT_TIMEOUT_MS;
private int describeConfigs = DEFAULT_TIMEOUT_MS;
private int delete = DEFAULT_TIMEOUT_MS;
private int list = DEFAULT_TIMEOUT_MS;
}

/**
* User properties.
*/
@Getter
@Setter
@ConfigurationProperties("user")
public static class UserProperties {
private int alterQuotas = DEFAULT_TIMEOUT_MS;
private int alterScramCredentials = DEFAULT_TIMEOUT_MS;
private int describeQuotas = DEFAULT_TIMEOUT_MS;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,25 @@ private List<AclBinding> collectNs4KafkaAcls() {
*/
private List<AclBinding> collectBrokerAcls(boolean managedUsersOnly)
throws ExecutionException, InterruptedException, TimeoutException {
List<ResourceType> validResourceTypes =
List.of(org.apache.kafka.common.resource.ResourceType.TOPIC,
org.apache.kafka.common.resource.ResourceType.GROUP,
org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID);
List<ResourceType> validResourceTypes = List.of(
org.apache.kafka.common.resource.ResourceType.TOPIC,
org.apache.kafka.common.resource.ResourceType.GROUP,
org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID
);

AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter(
managedClusterProperties.getProvider()
.equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD) ? "UserV2:*" : null,
null, AclOperation.ANY, AclPermissionType.ANY);
null,
AclOperation.ANY,
AclPermissionType.ANY
);
AclBindingFilter aclBindingFilter = new AclBindingFilter(ResourcePatternFilter.ANY, accessControlEntryFilter);

List<AclBinding> userAcls = getAdminClient()
.describeAcls(aclBindingFilter)
.values().get(10, TimeUnit.SECONDS)
.values()
.get(managedClusterProperties.getTimeout().getAcl().getDescribe(), TimeUnit.MILLISECONDS)
.stream()
.filter(aclBinding -> validResourceTypes.contains(aclBinding.pattern().resourceType()))
.toList();
Expand Down Expand Up @@ -320,7 +325,8 @@ private AclBinding convertConnectorAccessControlEntryToAclBinding(AccessControlE
ResourcePattern resourcePattern = new ResourcePattern(
org.apache.kafka.common.resource.ResourceType.GROUP,
"connect-" + accessControlEntry.getSpec().getResource(),
patternType);
patternType
);

String kafkaUser = namespaceRepository.findByName(accessControlEntry.getSpec().getGrantedTo())
.orElseThrow()
Expand Down Expand Up @@ -359,20 +365,23 @@ private List<AclOperation> computeAclOperationForOwner(ResourceType resourceType
*/
private void deleteAcls(List<AclBinding> toDelete) {
getAdminClient()
.deleteAcls(toDelete.stream()
.deleteAcls(toDelete
.stream()
.map(AclBinding::toFilter)
.toList())
.values().forEach((key, value) -> {
.values()
.forEach((key, value) -> {
try {
value.get(10, TimeUnit.SECONDS);
value.get(managedClusterProperties.getTimeout().getAcl().getDelete(), TimeUnit.MILLISECONDS);
log.info("Success deleting ACL {} on {}", key, managedClusterProperties.getName());
} catch (InterruptedException e) {
log.error("Error", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error(
String.format("Error while deleting ACL %s on %s", key,
managedClusterProperties.getName()), e);
String.format("Error while deleting ACL %s on %s", key, managedClusterProperties.getName()),
e
);
}
});
}
Expand Down Expand Up @@ -408,8 +417,10 @@ public void deleteAcl(AccessControlEntry accessControlEntry) {
*/
public void deleteKafkaStreams(Namespace namespace, KafkaStream kafkaStream) {
if (managedClusterProperties.isManageAcls()) {
List<AclBinding> results =
new ArrayList<>(buildAclBindingsFromKafkaStream(kafkaStream, namespace.getSpec().getKafkaUser()));
List<AclBinding> results = new ArrayList<>(buildAclBindingsFromKafkaStream(
kafkaStream,
namespace.getSpec().getKafkaUser())
);
deleteAcls(results);
}
}
Expand All @@ -420,18 +431,21 @@ public void deleteKafkaStreams(Namespace namespace, KafkaStream kafkaStream) {
* @param toCreate The list of ACLs to create
*/
private void createAcls(List<AclBinding> toCreate) {
getAdminClient().createAcls(toCreate)
getAdminClient()
.createAcls(toCreate)
.values()
.forEach((key, value) -> {
try {
value.get(10, TimeUnit.SECONDS);
log.info("Success creating ACL {} on {}", key, this.managedClusterProperties.getName());
value.get(managedClusterProperties.getTimeout().getAcl().getCreate(), TimeUnit.MILLISECONDS);
log.info("Success creating ACL {} on {}", key, managedClusterProperties.getName());
} catch (InterruptedException e) {
log.error("Error", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error(String.format("Error while creating ACL %s on %s", key,
this.managedClusterProperties.getName()), e);
log.error(
String.format("Error while creating ACL %s on %s", key, managedClusterProperties.getName()),
e
);
}
});
}
Expand Down
Loading
Loading