From e4f3f901b81f9c087a106b180fbbd839cb07d914 Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Sat, 26 Oct 2024 08:49:50 -0400 Subject: [PATCH] Metrics via API, per-cluster Prometheus Config, OCP monitoring support Signed-off-by: Michael Edgar --- api/pom.xml | 8 +- .../streamshub/console/api/ClientFactory.java | 7 +- .../console/api/model/KafkaCluster.java | 7 + .../streamshub/console/api/model/Metrics.java | 51 +++ .../api/service/KafkaClusterService.java | 43 +++ .../console/api/service/MetricsService.java | 187 +++++++++++ .../console/api/support/KafkaContext.java | 10 + .../console/api/support/PrometheusAPI.java | 49 +++ .../factories/ConsoleConfigFactory.java | 85 +++-- api/src/main/resources/application.properties | 1 + .../queries/kafkaCluster_ranges.promql | 2 +- .../queries/kafkaCluster_values.promql | 41 --- .../console/config/ConsoleConfig.java | 33 +- .../console/config/KafkaClusterConfig.java | 18 +- .../console/config/KafkaConfig.java | 5 +- .../streamshub/console/config/Named.java | 16 + .../console/config/PrometheusConfig.java | 128 ++++++++ .../console/config/SchemaRegistryConfig.java | 7 +- operator/pom.xml | 8 - .../streamshub/console/ConsoleReconciler.java | 36 +- .../api/v1alpha1/spec/ConsoleSpec.java | 24 +- .../api/v1alpha1/spec/KafkaCluster.java | 18 +- .../console/api/v1alpha1/spec/Prometheus.java | 130 ++++++++ .../api/v1alpha1/spec/SchemaRegistry.java | 4 +- .../dependents/BaseClusterRoleBinding.java | 2 +- .../dependents/BaseLabelDiscriminator.java | 19 +- .../dependents/ConsoleClusterRoleBinding.java | 10 +- .../console/dependents/ConsoleDeployment.java | 16 +- .../dependents/ConsoleLabelDiscriminator.java | 10 + .../ConsoleMonitoringClusterRoleBinding.java | 77 +++++ .../console/dependents/ConsoleResource.java | 16 + .../console/dependents/ConsoleSecret.java | 120 ++++++- .../dependents/PrometheusPrecondition.java | 28 ++ .../console/dependents/PrometheusService.java | 9 +- operator/src/main/kubernetes/kubernetes.yml | 23 ++ ...console-monitoring.clusterrolebinding.yaml | 12 + .../dependents/console.deployment.yaml | 12 + .../console/ConsoleReconcilerTest.java | 8 +- pom.xml | 6 + ui/api/kafka/actions.ts | 310 +----------------- ui/api/kafka/cluster.promql.ts | 72 ---- ui/api/kafka/kpi.promql.ts | 93 ------ ui/api/kafka/schema.ts | 54 ++- ui/api/kafka/topic.promql.ts | 31 -- .../(authorized)/kafka/[kafkaId]/layout.tsx | 2 +- .../kafka/[kafkaId]/nodes/NodesTable.tsx | 2 +- .../kafka/[kafkaId]/nodes/page.tsx | 81 +++-- .../overview/ConnectedClusterCard.tsx | 43 +-- .../overview/ConnectedClusterChartsCard.tsx | 41 +-- .../overview/ConnectedTopicChartsCard.tsx | 32 +- .../kafka/[kafkaId]/overview/page.tsx | 26 +- ui/app/[locale]/(authorized)/layout.tsx | 2 +- ui/app/api/auth/[...nextauth]/auth-options.ts | 48 +++ ui/app/api/auth/[...nextauth]/route.ts | 51 +-- .../ClusterOverview/ClusterChartsCard.tsx | 8 +- .../ClusterOverview/TopicChartsCard.tsx | 5 +- .../components/ChartCpuUsage.tsx | 39 ++- .../components/ChartDiskUsage.tsx | 70 ++-- .../components/ChartIncomingOutgoing.tsx | 85 +++-- .../components/ChartMemoryUsage.tsx | 30 +- .../ClusterOverview/components/chartConsts.ts | 2 +- ui/environment.d.ts | 1 - ui/package.json | 3 - ui/utils/session.ts | 2 +- ui/utils/useFormatBytes.ts | 26 ++ 65 files changed, 1500 insertions(+), 945 deletions(-) create mode 100644 api/src/main/java/com/github/streamshub/console/api/model/Metrics.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/PrometheusAPI.java create mode 100644 common/src/main/java/com/github/streamshub/console/config/Named.java create mode 100644 common/src/main/java/com/github/streamshub/console/config/PrometheusConfig.java create mode 100644 operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/Prometheus.java create mode 100644 operator/src/main/java/com/github/streamshub/console/dependents/ConsoleMonitoringClusterRoleBinding.java create mode 100644 operator/src/main/java/com/github/streamshub/console/dependents/PrometheusPrecondition.java create mode 100644 operator/src/main/resources/com/github/streamshub/console/dependents/console-monitoring.clusterrolebinding.yaml delete mode 100644 ui/api/kafka/cluster.promql.ts delete mode 100644 ui/api/kafka/kpi.promql.ts delete mode 100644 ui/api/kafka/topic.promql.ts create mode 100644 ui/app/api/auth/[...nextauth]/auth-options.ts diff --git a/api/pom.xml b/api/pom.xml index 2e7b7114c..1ae64db8f 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -399,10 +399,10 @@ build src/main/docker/Dockerfile true - ${docker.registry} - ${docker.group} - ${docker.tag} - ${docker.push} + ${container-image.registry} + ${container-image.group} + ${container-image.tag} + ${container-image.push} diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index 87505bfaf..0f0b25bbd 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -54,6 +54,7 @@ import org.jboss.logging.Logger; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.streamshub.console.api.service.MetricsService; import com.github.streamshub.console.api.support.Holder; import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.TrustAllCertificateManager; @@ -154,6 +155,9 @@ public class ClientFactory { @Named("kafkaAdminFilter") UnaryOperator kafkaAdminFilter = UnaryOperator.identity(); + @Inject + MetricsService metricsService; + @Produces @ApplicationScoped Map produceKafkaContexts(Function, Admin> adminBuilder) { @@ -168,7 +172,7 @@ Map produceKafkaContexts(Function, Adm consoleConfig.getKafka().getClusters() .stream() .filter(c -> cachedKafkaResource(c).isEmpty()) - .filter(Predicate.not(KafkaClusterConfig::hasNamespace)) + //.filter(Predicate.not(KafkaClusterConfig::hasNamespace)) .forEach(clusterConfig -> putKafkaContext(contexts, clusterConfig, Optional.empty(), @@ -302,6 +306,7 @@ void putKafkaContext(Map contexts, KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin); ctx.schemaRegistryClient(registryConfig, mapper); + ctx.prometheus(metricsService.createClient(consoleConfig, clusterConfig)); KafkaContext previous = contexts.put(clusterId, ctx); diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java index 159b1b9d4..1b95087fb 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java @@ -185,6 +185,9 @@ static class Attributes { @JsonProperty boolean cruiseControlEnabled; + @JsonProperty + Metrics metrics = new Metrics(); + Attributes(List nodes, Node controller, List authorizedOperations) { this.nodes = nodes; this.controller = controller; @@ -328,4 +331,8 @@ public void nodePools(List nodePools) { public void cruiseControlEnabled(boolean cruiseControlEnabled) { attributes.cruiseControlEnabled = cruiseControlEnabled; } + + public Metrics metrics() { + return attributes.metrics; + } } diff --git a/api/src/main/java/com/github/streamshub/console/api/model/Metrics.java b/api/src/main/java/com/github/streamshub/console/api/model/Metrics.java new file mode 100644 index 000000000..3a3fce68a --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/model/Metrics.java @@ -0,0 +1,51 @@ +package com.github.streamshub.console.api.model; + +import java.time.Instant; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.eclipse.microprofile.openapi.annotations.media.Schema; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; + +public record Metrics( + @JsonProperty + Map> values, + + @JsonProperty + Map> ranges) { + + public Metrics() { + this(new LinkedHashMap<>(), new LinkedHashMap<>()); + } + + @Schema(additionalProperties = String.class) + public static record ValueMetric( + @JsonProperty + String value, + + @JsonAnyGetter + @Schema(hidden = true) + Map attributes) { + } + + @Schema(additionalProperties = String.class) + public static record RangeMetric( + @JsonProperty + @Schema(implementation = String[][].class) + List range, + + @JsonAnyGetter + @Schema(hidden = true) + Map attributes) { + } + + @JsonFormat(shape = JsonFormat.Shape.ARRAY) + @JsonPropertyOrder({"when", "value"}) + public static record RangeEntry(Instant when, String value) { + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java index 316a14d90..fb53fb0cd 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java @@ -1,5 +1,8 @@ package com.github.streamshub.console.api.service; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; @@ -7,6 +10,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.function.Predicate; @@ -76,6 +80,9 @@ public class KafkaClusterService { @Inject ConsoleConfig consoleConfig; + @Inject + MetricsService metricsService; + @Inject /** * All Kafka contexts known to the application @@ -148,6 +155,7 @@ public CompletionStage describeCluster(List fields) { enumNames(get(result::authorizedOperations)))) .thenApplyAsync(this::addKafkaContextData, threadContext.currentContextExecutor()) .thenApply(this::addKafkaResourceData) + .thenCompose(cluster -> addMetrics(cluster, fields)) .thenApply(this::setManaged); } @@ -313,6 +321,41 @@ KafkaCluster setManaged(KafkaCluster cluster) { return cluster; } + + CompletionStage addMetrics(KafkaCluster cluster, List fields) { + if (!fields.contains(KafkaCluster.Fields.METRICS)) { + return CompletableFuture.completedStage(cluster); + } + + if (kafkaContext.prometheus() == null) { + logger.warnf("Kafka cluster metrics were requested, but Prometheus URL is not configured"); + return CompletableFuture.completedStage(cluster); + } + + String namespace = cluster.namespace(); + String name = cluster.name(); + String rangeQuery; + String valueQuery; + + try (var rangesStream = getClass().getResourceAsStream("/metrics/queries/kafkaCluster_ranges.promql"); + var valuesStream = getClass().getResourceAsStream("/metrics/queries/kafkaCluster_values.promql")) { + rangeQuery = new String(rangesStream.readAllBytes(), StandardCharsets.UTF_8) + .formatted(namespace, name); + valueQuery = new String(valuesStream.readAllBytes(), StandardCharsets.UTF_8) + .formatted(namespace, name); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + var rangeResults = metricsService.queryRanges(rangeQuery).toCompletableFuture(); + var valueResults = metricsService.queryValues(valueQuery).toCompletableFuture(); + + return CompletableFuture.allOf( + rangeResults.thenAccept(cluster.metrics().ranges()::putAll), + valueResults.thenAccept(cluster.metrics().values()::putAll)) + .thenApply(nothing -> cluster); + } + private Optional findCluster(KafkaCluster cluster) { return findCluster(Cache.namespaceKeyFunc(cluster.namespace(), cluster.name())); } diff --git a/api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java b/api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java new file mode 100644 index 000000000..2753c4a43 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java @@ -0,0 +1,187 @@ +package com.github.streamshub.console.api.service; + +import java.io.IOException; +import java.net.URI; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.BiFunction; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.json.JsonArray; +import jakarta.json.JsonObject; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.client.ClientRequestContext; +import jakarta.ws.rs.client.ClientRequestFilter; +import jakarta.ws.rs.core.HttpHeaders; + +import org.eclipse.microprofile.rest.client.RestClientBuilder; +import org.jboss.logging.Logger; + +import com.github.streamshub.console.api.model.Metrics; +import com.github.streamshub.console.api.model.Metrics.RangeEntry; +import com.github.streamshub.console.api.support.KafkaContext; +import com.github.streamshub.console.api.support.PrometheusAPI; +import com.github.streamshub.console.config.ConsoleConfig; +import com.github.streamshub.console.config.KafkaClusterConfig; +import com.github.streamshub.console.config.PrometheusConfig; +import com.github.streamshub.console.config.PrometheusConfig.Type; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.quarkus.tls.TlsConfiguration; +import io.quarkus.tls.TlsConfigurationRegistry; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; + +@ApplicationScoped +public class MetricsService { + + public static final String METRIC_NAME = "__console_metric_name__"; + + @Inject + Logger logger; + + @Inject + TlsConfigurationRegistry certificates; + + @Inject + KubernetesClient k8s; + + @Inject + KafkaContext kafkaContext; + + ClientRequestFilter createAuthenticationFilter(PrometheusConfig config) { + return new ClientRequestFilter() { + @Override + public void filter(ClientRequestContext requestContext) throws IOException { + var authConfig = config.getAuthentication(); + String authHeader = null; + + if (authConfig instanceof PrometheusConfig.Basic basic) { + authHeader = "Basic " + Base64.getEncoder().encodeToString("%s:%s".formatted( + basic.getUsername(), + basic.getPassword()) + .getBytes()); + } else if (authConfig instanceof PrometheusConfig.Bearer bearer) { + authHeader = "Bearer " + bearer.getToken(); + } else if (config.getType() == Type.OPENSHIFT_MONITORING) { + // ServiceAccount needs cluster role `cluster-monitoring-view` + authHeader = "Bearer " + k8s.getConfiguration().getAutoOAuthToken(); + } + + if (authHeader != null) { + requestContext.getHeaders().add(HttpHeaders.AUTHORIZATION, authHeader); + } + } + }; + } + + public PrometheusAPI createClient(ConsoleConfig consoleConfig, KafkaClusterConfig clusterConfig) { + PrometheusConfig prometheusConfig; + + if (clusterConfig.getMetricsSource() != null) { + prometheusConfig = consoleConfig.getMetricsSources() + .stream() + .filter(source -> source.getName().equals(clusterConfig.getMetricsSource())) + .findFirst() + .orElseThrow(); + + var trustStore = certificates.getDefault().map(TlsConfiguration::getTrustStore).orElse(null); + + return RestClientBuilder.newBuilder() + .baseUri(URI.create(prometheusConfig.getUrl())) + .trustStore(trustStore) + .register(createAuthenticationFilter(prometheusConfig)) + .build(PrometheusAPI.class); + } + + return null; + } + + CompletionStage>> queryValues(String query) { + PrometheusAPI prometheusAPI = kafkaContext.prometheus(); + + return fetchMetrics( + () -> prometheusAPI.query(query, Instant.now()), + (metric, attributes) -> { + // ignore timestamp in first position + String value = metric.getJsonArray("value").getString(1); + return new Metrics.ValueMetric(value, attributes); + }); + } + + CompletionStage>> queryRanges(String query) { + PrometheusAPI prometheusAPI = kafkaContext.prometheus(); + + return fetchMetrics( + () -> { + Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + Instant start = now.minus(30, ChronoUnit.MINUTES); + Instant end = now; + return prometheusAPI.queryRange(query, start, end, "25"); + }, + (metric, attributes) -> { + List values = metric.getJsonArray("values") + .stream() + .map(JsonArray.class::cast) + .map(e -> new Metrics.RangeEntry( + Instant.ofEpochMilli((long) (e.getJsonNumber(0).doubleValue() * 1000d)), + e.getString(1) + )) + .toList(); + + return new Metrics.RangeMetric(values, attributes); + }); + } + + CompletionStage>> fetchMetrics( + Supplier operation, + BiFunction, M> builder) { + + return CompletableFuture.supplyAsync(() -> { + try { + return extractMetrics(operation.get(), builder); + } catch (WebApplicationException wae) { + logger.warnf("Failed to retrieve Kafka cluster metrics, status %d: %s", + wae.getResponse().getStatus(), + wae.getResponse().getEntity()); + return Collections.emptyMap(); + } catch (Exception e) { + logger.warnf(e, "Failed to retrieve Kafka cluster metrics"); + return Collections.emptyMap(); + } + }); + } + + Map> extractMetrics(JsonObject response, + BiFunction, M> builder) { + + return response.getJsonObject("data").getJsonArray("result") + .stream() + .map(JsonObject.class::cast) + .map(metric -> { + JsonObject meta = metric.getJsonObject("metric"); + String metricName = meta.getString(METRIC_NAME); + + Map attributes = meta.keySet() + .stream() + .filter(Predicate.not(METRIC_NAME::equals)) + .map(key -> Map.entry(key, meta.getString(key))) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return Map.entry(metricName, builder.apply(metric, attributes)); + }) + .collect(groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toList()))); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java index b577b5e99..df3caf59d 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -43,6 +43,7 @@ public class KafkaContext implements Closeable { final Admin admin; boolean applicationScoped; SchemaRegistryContext schemaRegistryContext; + PrometheusAPI prometheus; public KafkaContext(KafkaClusterConfig clusterConfig, Kafka resource, Map, Map> configs, Admin admin) { this.clusterConfig = clusterConfig; @@ -56,6 +57,7 @@ public KafkaContext(KafkaContext other, Admin admin) { this(other.clusterConfig, other.resource, other.configs, admin); this.applicationScoped = false; this.schemaRegistryContext = other.schemaRegistryContext; + this.prometheus = other.prometheus; } public static String clusterId(KafkaClusterConfig clusterConfig, Optional kafkaResource) { @@ -137,6 +139,14 @@ public SchemaRegistryContext schemaRegistryContext() { return schemaRegistryContext; } + public void prometheus(PrometheusAPI prometheus) { + this.prometheus = prometheus; + } + + public PrometheusAPI prometheus() { + return prometheus; + } + public String saslMechanism(Class clientType) { return configs(clientType).get(SaslConfigs.SASL_MECHANISM) instanceof String auth ? auth : ""; } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/PrometheusAPI.java b/api/src/main/java/com/github/streamshub/console/api/support/PrometheusAPI.java new file mode 100644 index 000000000..7adf7e0d2 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/PrometheusAPI.java @@ -0,0 +1,49 @@ +package com.github.streamshub.console.api.support; + +import java.time.Instant; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.json.JsonObject; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; + +@ApplicationScoped +@RegisterRestClient(configKey = "prometheus") +@Path("/api/v1") +public interface PrometheusAPI { + + /** + * Evaluates an instant query at a single point in time + * + * @see Instant queries + */ + @Path("query") + @POST + @Consumes(MediaType.APPLICATION_FORM_URLENCODED) + @Produces(MediaType.APPLICATION_JSON) + JsonObject query( + @QueryParam("query") String query, + @QueryParam("time") Instant time); + + /** + * Evaluates an expression query over a range of time + * + * @see Range queries + */ + @Path("query_range") + @POST + @Consumes(MediaType.APPLICATION_FORM_URLENCODED) + @Produces(MediaType.APPLICATION_JSON) + JsonObject queryRange( + @QueryParam("query") String query, + @QueryParam("start") Instant start, + @QueryParam("end") Instant end, + @QueryParam("step") String step); + +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/factories/ConsoleConfigFactory.java b/api/src/main/java/com/github/streamshub/console/api/support/factories/ConsoleConfigFactory.java index 9bd4aa5e9..96a81fef6 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/factories/ConsoleConfigFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/factories/ConsoleConfigFactory.java @@ -3,8 +3,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; +import java.net.URL; import java.nio.file.Path; -import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -17,7 +17,10 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.github.streamshub.console.api.support.ValidationProxy; import com.github.streamshub.console.config.ConsoleConfig; @@ -54,30 +57,7 @@ public ConsoleConfig produceConsoleConfig() { } }) .filter(Objects::nonNull) - .map(url -> { - log.infof("Loading console configuration from %s", url); - ObjectMapper yamlMapper = mapper.copyWith(new YAMLFactory()); - - try (InputStream stream = url.openStream()) { - return yamlMapper.readValue(stream, ConsoleConfig.class); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }) - .map(consoleConfig -> { - consoleConfig.getSchemaRegistries().forEach(registry -> { - registry.setUrl(resolveValue(registry.getUrl())); - }); - - consoleConfig.getKafka().getClusters().forEach(cluster -> { - resolveValues(cluster.getProperties()); - resolveValues(cluster.getAdminProperties()); - resolveValues(cluster.getProducerProperties()); - resolveValues(cluster.getConsumerProperties()); - }); - - return consoleConfig; - }) + .map(this::loadConfiguration) .map(validationService::validate) .orElseGet(() -> { log.warn("Console configuration has not been specified using `console.config-path` property"); @@ -85,9 +65,58 @@ public ConsoleConfig produceConsoleConfig() { }); } - private void resolveValues(Map properties) { - properties.entrySet().forEach(entry -> - entry.setValue(resolveValue(entry.getValue()))); + private ConsoleConfig loadConfiguration(URL url) { + log.infof("Loading console configuration from %s", url); + ObjectMapper yamlMapper = mapper.copyWith(new YAMLFactory()); + JsonNode tree; + + try (InputStream stream = url.openStream()) { + tree = yamlMapper.readTree(stream); + } catch (IOException e) { + throw new UncheckedIOException("Failed to read configuration YAML", e); + } + + processNode(tree); + + try { + return mapper.treeToValue(tree, ConsoleConfig.class); + } catch (IOException e) { + throw new UncheckedIOException("Failed to load configuration model", e); + } + } + + private void processNode(JsonNode node) { + if (node.isArray()) { + int i = 0; + for (JsonNode entry : node) { + processNode((ArrayNode) node, i++, entry); + } + } else if (node.isObject()) { + for (var cursor = node.fields(); cursor.hasNext();) { + var field = cursor.next(); + processNode((ObjectNode) node, field.getKey(), field.getValue()); + } + } + } + + private void processNode(ObjectNode parent, String key, JsonNode node) { + if (node.isValueNode()) { + if (node.isTextual()) { + parent.put(key, resolveValue(node.asText())); + } + } else { + processNode(node); + } + } + + private void processNode(ArrayNode parent, int position, JsonNode node) { + if (node.isValueNode()) { + if (node.isTextual()) { + parent.set(position, resolveValue(node.asText())); + } + } else { + processNode(node); + } } /** diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index dfc2213b5..c2856341b 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -69,6 +69,7 @@ console.kafka.admin.default.api.timeout.ms=10000 ######## #%dev.quarkus.http.auth.proactive=false #%dev.quarkus.http.auth.permission."oidc".policy=permit +%dev.quarkus.tls.trust-all=true %dev.quarkus.kubernetes-client.trust-certs=true %dev.quarkus.log.category."io.vertx.core.impl.BlockedThreadChecker".level=OFF %dev.quarkus.log.category."com.github.streamshub.console".level=DEBUG diff --git a/api/src/main/resources/metrics/queries/kafkaCluster_ranges.promql b/api/src/main/resources/metrics/queries/kafkaCluster_ranges.promql index 0e83940fe..51a3e0c44 100644 --- a/api/src/main/resources/metrics/queries/kafkaCluster_ranges.promql +++ b/api/src/main/resources/metrics/queries/kafkaCluster_ranges.promql @@ -1,7 +1,7 @@ sum by (nodeId, __console_metric_name__) ( label_replace( label_replace( - rate(container_cpu_usage_seconds_total{namespace="%1$s",pod=~"%2$s-.+-\\d+"}[1m]), + rate(container_cpu_usage_seconds_total{namespace="%1$s",pod=~"%2$s-.+-\\d+"}[5m]), "nodeId", "$1", "pod", diff --git a/api/src/main/resources/metrics/queries/kafkaCluster_values.promql b/api/src/main/resources/metrics/queries/kafkaCluster_values.promql index 0cfe40fcc..8c17d05f7 100644 --- a/api/src/main/resources/metrics/queries/kafkaCluster_values.promql +++ b/api/src/main/resources/metrics/queries/kafkaCluster_values.promql @@ -16,47 +16,6 @@ sum by (__console_metric_name__, nodeId) ( or -sum by (__console_metric_name__) ( - label_replace( - kafka_controller_kafkacontroller_globaltopiccount{namespace="%1$s",pod=~"%2$s-.+-%3$d",strimzi_io_kind="Kafka"} > 0, - "__console_metric_name__", - "total_topics", - "", - "" - ) -) - -or - -sum by (__console_metric_name__) ( - label_replace( - kafka_controller_kafkacontroller_globalpartitioncount{namespace="%1$s",pod=~"%2$s-.+-%3$d",strimzi_io_kind="Kafka"} > 0, - "__console_metric_name__", - "total_partitions", - "", - "" - ) -) - -or - -label_replace( - ( - count( - sum by (topic) ( - kafka_cluster_partition_underreplicated{namespace="%1$s",pod=~"%2$s-.+-\\d+",strimzi_io_kind="Kafka"} > 0 - ) - ) - OR on() vector(0) - ), - "__console_metric_name__", - "underreplicated_topics", - "", - "" -) - -or - sum by (__console_metric_name__, nodeId) ( label_replace( label_replace( diff --git a/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java b/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java index 19d7ec94b..75717bc3c 100644 --- a/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java @@ -7,9 +7,22 @@ import jakarta.validation.constraints.AssertTrue; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; import io.xlate.validation.constraints.Expression; +@Expression( + message = "Kafka cluster references an unknown metrics source", + value = """ + metricsSources = self.metricsSources.stream() + .map(metrics -> metrics.getName()) + .toList(); + self.kafka.clusters.stream() + .map(cluster -> cluster.getMetricsSource()) + .filter(source -> source != null) + .allMatch(source -> metricsSources.contains(source)) + """) @Expression( message = "Kafka cluster references an unknown schema registry", value = """ @@ -21,20 +34,30 @@ .filter(registry -> registry != null) .allMatch(registry -> registryNames.contains(registry)) """) +@JsonInclude(Include.NON_NULL) public class ConsoleConfig { KubernetesConfig kubernetes = new KubernetesConfig(); + @Valid + List metricsSources = new ArrayList<>(); + @Valid List schemaRegistries = new ArrayList<>(); @Valid KafkaConfig kafka = new KafkaConfig(); + @JsonIgnore + @AssertTrue(message = "Metrics source names must be unique") + public boolean hasUniqueMetricsSourceNames() { + return Named.uniqueNames(metricsSources); + } + @JsonIgnore @AssertTrue(message = "Schema registry names must be unique") public boolean hasUniqueRegistryNames() { - return schemaRegistries.stream().map(SchemaRegistryConfig::getName).distinct().count() == schemaRegistries.size(); + return Named.uniqueNames(schemaRegistries); } public KubernetesConfig getKubernetes() { @@ -45,6 +68,14 @@ public void setKubernetes(KubernetesConfig kubernetes) { this.kubernetes = kubernetes; } + public List getMetricsSources() { + return metricsSources; + } + + public void setMetricsSources(List metricsSources) { + this.metricsSources = metricsSources; + } + public List getSchemaRegistries() { return schemaRegistries; } diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java index a676d80c6..7197764ee 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java @@ -6,14 +6,21 @@ import jakarta.validation.constraints.NotBlank; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; -public class KafkaClusterConfig { +@JsonInclude(Include.NON_NULL) +public class KafkaClusterConfig implements Named { private String id; @NotBlank(message = "Kafka cluster `name` is required") private String name; private String namespace; private String listener; + /** + * Name of a configured metrics source used by this Kafka cluster + */ + private String metricsSource; /** * Name of a configured schema registry that will be used to ser/des configurations * with this Kafka cluster. @@ -42,6 +49,7 @@ public void setId(String id) { this.id = id; } + @Override public String getName() { return name; } @@ -66,6 +74,14 @@ public void setListener(String listener) { this.listener = listener; } + public String getMetricsSource() { + return metricsSource; + } + + public void setMetricsSource(String metricsSource) { + this.metricsSource = metricsSource; + } + public String getSchemaRegistry() { return schemaRegistry; } diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java index 429817137..7c16ca926 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java @@ -8,7 +8,10 @@ import jakarta.validation.constraints.AssertTrue; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +@JsonInclude(Include.NON_NULL) public class KafkaConfig { @Valid @@ -17,7 +20,7 @@ public class KafkaConfig { @JsonIgnore @AssertTrue(message = "Kafka cluster names must be unique") public boolean hasUniqueClusterNames() { - return clusters.stream().map(KafkaClusterConfig::getName).distinct().count() == clusters.size(); + return Named.uniqueNames(clusters); } @JsonIgnore diff --git a/common/src/main/java/com/github/streamshub/console/config/Named.java b/common/src/main/java/com/github/streamshub/console/config/Named.java new file mode 100644 index 000000000..45e1c7961 --- /dev/null +++ b/common/src/main/java/com/github/streamshub/console/config/Named.java @@ -0,0 +1,16 @@ +package com.github.streamshub.console.config; + +import java.util.Collection; + +public interface Named { + + static boolean uniqueNames(Collection configs) { + if (configs == null) { + return true; + } + return configs.stream().map(Named::getName).distinct().count() == configs.size(); + } + + String getName(); + +} diff --git a/common/src/main/java/com/github/streamshub/console/config/PrometheusConfig.java b/common/src/main/java/com/github/streamshub/console/config/PrometheusConfig.java new file mode 100644 index 000000000..2f330cb0b --- /dev/null +++ b/common/src/main/java/com/github/streamshub/console/config/PrometheusConfig.java @@ -0,0 +1,128 @@ +package com.github.streamshub.console.config; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotBlank; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.annotation.JsonInclude.Include; + +@JsonInclude(Include.NON_NULL) +public class PrometheusConfig implements Named { + + @NotBlank + private String name; + private Type type; + private String url; + @Valid + private Authentication authentication; + + @Override + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public Authentication getAuthentication() { + return authentication; + } + + public void setAuthentication(Authentication authentication) { + this.authentication = authentication; + } + + public enum Type { + OPENSHIFT_MONITORING("openshift-monitoring"), + STANDALONE("standalone"); + + private final String value; + + private Type(String value) { + this.value = value; + } + + @JsonValue + public String value() { + return value; + } + + @JsonCreator + public static Type fromValue(String value) { + if (value == null) { + return STANDALONE; + } + + for (var type : values()) { + if (type.value.equals(value.trim())) { + return type; + } + } + + throw new IllegalArgumentException("Invalid Prometheus type: " + value); + } + } + + @JsonTypeInfo(use = Id.DEDUCTION) + @JsonSubTypes({ @JsonSubTypes.Type(Basic.class), @JsonSubTypes.Type(Bearer.class) }) + abstract static class Authentication { + } + + public static class Basic extends Authentication { + @NotBlank + private String username; + @NotBlank + private String password; + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + } + + public static class Bearer extends Authentication { + @NotBlank + private String token; + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } + } +} diff --git a/common/src/main/java/com/github/streamshub/console/config/SchemaRegistryConfig.java b/common/src/main/java/com/github/streamshub/console/config/SchemaRegistryConfig.java index 0d4dc18ec..999459091 100644 --- a/common/src/main/java/com/github/streamshub/console/config/SchemaRegistryConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/SchemaRegistryConfig.java @@ -2,7 +2,11 @@ import jakarta.validation.constraints.NotBlank; -public class SchemaRegistryConfig { +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; + +@JsonInclude(Include.NON_NULL) +public class SchemaRegistryConfig implements Named { @NotBlank(message = "Schema registry `name` is required") String name; @@ -10,6 +14,7 @@ public class SchemaRegistryConfig { @NotBlank(message = "Schema registry `url` is required") String url; + @Override public String getName() { return name; } diff --git a/operator/pom.xml b/operator/pom.xml index e73255169..835a68f06 100644 --- a/operator/pom.xml +++ b/operator/pom.xml @@ -12,14 +12,6 @@ console-operator jar - - - localhost - streamshub - ${project.version} - false - - com.github.streamshub diff --git a/operator/src/main/java/com/github/streamshub/console/ConsoleReconciler.java b/operator/src/main/java/com/github/streamshub/console/ConsoleReconciler.java index daca14e5c..57331ef9e 100644 --- a/operator/src/main/java/com/github/streamshub/console/ConsoleReconciler.java +++ b/operator/src/main/java/com/github/streamshub/console/ConsoleReconciler.java @@ -12,6 +12,7 @@ import com.github.streamshub.console.dependents.ConsoleClusterRoleBinding; import com.github.streamshub.console.dependents.ConsoleDeployment; import com.github.streamshub.console.dependents.ConsoleIngress; +import com.github.streamshub.console.dependents.ConsoleMonitoringClusterRoleBinding; import com.github.streamshub.console.dependents.ConsoleResource; import com.github.streamshub.console.dependents.ConsoleSecret; import com.github.streamshub.console.dependents.ConsoleService; @@ -22,6 +23,7 @@ import com.github.streamshub.console.dependents.PrometheusClusterRoleBinding; import com.github.streamshub.console.dependents.PrometheusConfigMap; import com.github.streamshub.console.dependents.PrometheusDeployment; +import com.github.streamshub.console.dependents.PrometheusPrecondition; import com.github.streamshub.console.dependents.PrometheusService; import com.github.streamshub.console.dependents.PrometheusServiceAccount; @@ -53,23 +55,28 @@ dependents = { @Dependent( name = PrometheusClusterRole.NAME, - type = PrometheusClusterRole.class), + type = PrometheusClusterRole.class, + reconcilePrecondition = PrometheusPrecondition.class), @Dependent( name = PrometheusServiceAccount.NAME, - type = PrometheusServiceAccount.class), + type = PrometheusServiceAccount.class, + reconcilePrecondition = PrometheusPrecondition.class), @Dependent( name = PrometheusClusterRoleBinding.NAME, type = PrometheusClusterRoleBinding.class, + reconcilePrecondition = PrometheusPrecondition.class, dependsOn = { PrometheusClusterRole.NAME, PrometheusServiceAccount.NAME }), @Dependent( name = PrometheusConfigMap.NAME, - type = PrometheusConfigMap.class), + type = PrometheusConfigMap.class, + reconcilePrecondition = PrometheusPrecondition.class), @Dependent( name = PrometheusDeployment.NAME, type = PrometheusDeployment.class, + reconcilePrecondition = PrometheusPrecondition.class, dependsOn = { PrometheusClusterRoleBinding.NAME, PrometheusConfigMap.NAME @@ -78,6 +85,7 @@ @Dependent( name = PrometheusService.NAME, type = PrometheusService.class, + reconcilePrecondition = PrometheusPrecondition.class, dependsOn = { PrometheusDeployment.NAME }), @@ -94,6 +102,13 @@ ConsoleClusterRole.NAME, ConsoleServiceAccount.NAME }), + @Dependent( + name = ConsoleMonitoringClusterRoleBinding.NAME, + type = ConsoleMonitoringClusterRoleBinding.class, + reconcilePrecondition = ConsoleMonitoringClusterRoleBinding.Precondition.class, + dependsOn = { + ConsoleServiceAccount.NAME + }), @Dependent( name = ConsoleSecret.NAME, type = ConsoleSecret.class), @@ -113,8 +128,7 @@ dependsOn = { ConsoleClusterRoleBinding.NAME, ConsoleSecret.NAME, - ConsoleIngress.NAME, - PrometheusService.NAME + ConsoleIngress.NAME }, readyPostcondition = DeploymentReadyCondition.class), }) @@ -136,21 +150,21 @@ }), description = """ The Streamshub Console provides a web-based user interface tool for monitoring Apache Kafka® instances within a Kubernetes based cluster. - - It features a user-friendly way to view Kafka topics and consumer groups, facilitating the searching and filtering of streamed messages. The console also offers insights into Kafka broker disk usage, helping administrators monitor and optimize resource utilization. By simplifying complex Kafka operations, the Streamshub Console enhances the efficiency and effectiveness of data streaming management within Kubernetes environments. - + + It features a user-friendly way to view Kafka topics and consumer groups, facilitating the searching and filtering of streamed messages. The console also offers insights into Kafka broker disk usage, helping administrators monitor and optimize resource utilization. By simplifying complex Kafka operations, the Streamshub Console enhances the efficiency and effectiveness of data streaming management within Kubernetes environments. + ### Documentation Documentation to the current _main_ branch as well as all releases can be found on our [Github](https://github.com/streamshub/console). - + ### Contributing You can contribute to Console by: * Raising any issues you find while using Console * Fixing issues by opening Pull Requests * Improving user documentation * Talking about Console - + The [Contributor Guide](https://github.com/streamshub/console/blob/main/CONTRIBUTING.md) describes how to contribute to Console. - + ### License Console is licensed under the [Apache License, Version 2.0](https://github.com/streamshub/console?tab=Apache-2.0-1-ov-file#readme). For more details, visit the GitHub repository.""", diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConsoleSpec.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConsoleSpec.java index a09f1fd16..c4e8825e7 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConsoleSpec.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConsoleSpec.java @@ -11,19 +11,29 @@ @Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") @JsonInclude(JsonInclude.Include.NON_NULL) +// Enable validation rules for unique names when array maxItems and string maxLength can be specified +// to influence Kubernetes's estimated rule cost. +// https://github.com/fabric8io/kubernetes-client/pull/6447 +// +// @ValidationRule(value = """ +// !has(self.metricsSources) || +// self.metricsSources.all(s1, self.metricsSources.exists_one(s2, s2.name == s1.name)) +// """, +// message = "Metrics source names must be unique") public class ConsoleSpec { @Required String hostname; - Images images = new Images(); + Images images; + + List metricsSources; List schemaRegistries; List kafkaClusters = new ArrayList<>(); - // TODO: copy EnvVar into console's API to avoid unexpected changes - List env = new ArrayList<>(); + List env; public String getHostname() { return hostname; @@ -41,6 +51,14 @@ public void setImages(Images images) { this.images = images; } + public List getMetricsSources() { + return metricsSources; + } + + public void setMetricsSources(List metricsSources) { + this.metricsSources = metricsSources; + } + public List getSchemaRegistries() { return schemaRegistries; } diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java index 1f8f13745..bc3737aa9 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.github.streamshub.console.config.Named; import io.fabric8.generator.annotation.Required; import io.fabric8.generator.annotation.ValidationRule; @@ -14,7 +15,7 @@ // due to it being a "reserved" word. value = "!has(self.listener) || has(self.__namespace__)", message = "Property `listener` may not be used when `namespace` is omitted") -public class KafkaCluster { +public class KafkaCluster implements Named { @JsonPropertyDescription(""" Identifier to be used for this Kafka cluster in the console. When \ @@ -51,6 +52,12 @@ public class KafkaCluster { private Credentials credentials; + @JsonPropertyDescription(""" + Name of a configured Prometheus metrics source to use for this Kafka \ + cluster to display resource utilization charts in the console. + """) + private String metricsSource; + @JsonPropertyDescription(""" Name of a configured Apicurio Registry instance to use for serializing \ and de-serializing records written to or read from this Kafka cluster. @@ -73,6 +80,7 @@ public void setId(String id) { this.id = id; } + @Override public String getName() { return name; } @@ -105,6 +113,14 @@ public void setCredentials(Credentials credentials) { this.credentials = credentials; } + public String getMetricsSource() { + return metricsSource; + } + + public void setMetricsSource(String metricsSource) { + this.metricsSource = metricsSource; + } + public String getSchemaRegistry() { return schemaRegistry; } diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/Prometheus.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/Prometheus.java new file mode 100644 index 000000000..4d7b021f7 --- /dev/null +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/Prometheus.java @@ -0,0 +1,130 @@ +package com.github.streamshub.console.api.v1alpha1.spec; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import com.fasterxml.jackson.annotation.JsonValue; +import com.github.streamshub.console.config.Named; + +import io.fabric8.generator.annotation.Required; +import io.sundr.builder.annotations.Buildable; + +@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") +@JsonInclude(JsonInclude.Include.NON_NULL) +public class Prometheus implements Named { + + @Required + private String name; + @Required + private Type type; + private String url; + private Authentication authentication; + + @Override + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public Authentication getAuthentication() { + return authentication; + } + + public void setAuthentication(Authentication authentication) { + this.authentication = authentication; + } + + public enum Type { + EMBEDDED("embedded"), + OPENSHIFT_MONITORING("openshift-monitoring"), + STANDALONE("standalone"); + + private final String value; + + private Type(String value) { + this.value = value; + } + + @JsonValue + public String value() { + return value; + } + + @JsonCreator + public static Type fromValue(String value) { + if (value == null) { + return STANDALONE; + } + + for (var type : values()) { + if (type.value.equals(value.trim())) { + return type; + } + } + + throw new IllegalArgumentException("Invalid Prometheus type: " + value); + } + } + + @JsonTypeInfo(use = Id.DEDUCTION) + @JsonSubTypes({ @JsonSubTypes.Type(Basic.class), @JsonSubTypes.Type(Bearer.class) }) + abstract static class Authentication { + } + + public static class Basic extends Authentication { + @Required + private String username; + @Required + private String password; + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + } + + public static class Bearer extends Authentication { + @Required + private String token; + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } + } +} diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java index ee79587a6..126c19665 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java @@ -2,13 +2,14 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.github.streamshub.console.config.Named; import io.fabric8.generator.annotation.Required; import io.sundr.builder.annotations.Buildable; @Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") @JsonInclude(JsonInclude.Include.NON_NULL) -public class SchemaRegistry { +public class SchemaRegistry implements Named { @Required @JsonPropertyDescription(""" @@ -22,6 +23,7 @@ public class SchemaRegistry { @JsonPropertyDescription("URL of the Apicurio Registry server API.") private String url; + @Override public String getName() { return name; } diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/BaseClusterRoleBinding.java b/operator/src/main/java/com/github/streamshub/console/dependents/BaseClusterRoleBinding.java index 72b7f5d10..8be70c383 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/BaseClusterRoleBinding.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/BaseClusterRoleBinding.java @@ -43,7 +43,7 @@ protected ClusterRoleBinding desired(Console primary, Context context) .edit() .editMetadata() .withName(instanceName(primary)) - .withLabels(commonLabels(appName)) + .withLabels(commonLabels(appName, resourceName)) .endMetadata() .editRoleRef() .withName(roleName(primary)) diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/BaseLabelDiscriminator.java b/operator/src/main/java/com/github/streamshub/console/dependents/BaseLabelDiscriminator.java index 06db214ae..d2ddd8ef8 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/BaseLabelDiscriminator.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/BaseLabelDiscriminator.java @@ -1,5 +1,6 @@ package com.github.streamshub.console.dependents; +import java.util.Map; import java.util.Optional; import io.fabric8.kubernetes.api.model.HasMetadata; @@ -8,19 +9,27 @@ abstract class BaseLabelDiscriminator implements ResourceDiscriminator { - private final String label; - private final String matchValue; + private final Map matchLabels; protected BaseLabelDiscriminator(String label, String matchValue) { - this.label = label; - this.matchValue = matchValue; + this.matchLabels = Map.of(label, matchValue); + } + + protected BaseLabelDiscriminator(Map matchLabels) { + this.matchLabels = Map.copyOf(matchLabels); } public Optional distinguish(Class resourceType, HasMetadata primary, Context context) { return context.getSecondaryResourcesAsStream(resourceType) - .filter(d -> matchValue.equals(d.getMetadata().getLabels().get(label))) + .filter(this::matches) .findFirst(); } + + private boolean matches(HasMetadata resource) { + return matchLabels.entrySet() + .stream() + .allMatch(label -> label.getValue().equals(resource.getMetadata().getLabels().get(label.getKey()))); + } } diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleClusterRoleBinding.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleClusterRoleBinding.java index 2a125e137..e656fbfc7 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleClusterRoleBinding.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleClusterRoleBinding.java @@ -1,5 +1,7 @@ package com.github.streamshub.console.dependents; +import java.util.Map; + import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -12,11 +14,17 @@ @KubernetesDependent( namespaces = Constants.WATCH_ALL_NAMESPACES, labelSelector = ConsoleResource.MANAGEMENT_SELECTOR, - resourceDiscriminator = ConsoleLabelDiscriminator.class) + resourceDiscriminator = ConsoleClusterRoleBinding.Discriminator.class) public class ConsoleClusterRoleBinding extends BaseClusterRoleBinding { public static final String NAME = "console-clusterrolebinding"; + public static class Discriminator extends ConsoleLabelDiscriminator { + public Discriminator() { + super(Map.of(COMPONENT_LABEL, NAME)); + } + } + @Inject ConsoleClusterRole clusterRole; diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleDeployment.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleDeployment.java index 8b053ce7f..28f9cb6da 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleDeployment.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleDeployment.java @@ -1,5 +1,6 @@ package com.github.streamshub.console.dependents; +import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -9,6 +10,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import com.github.streamshub.console.api.v1alpha1.Console; +import com.github.streamshub.console.api.v1alpha1.spec.Images; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.javaoperatorsdk.operator.api.reconciler.Context; @@ -23,9 +25,6 @@ public class ConsoleDeployment extends CRUDKubernetesDependentResource context) { String name = instanceName(primary); String configSecretName = secret.instanceName(primary); - var imagesSpec = primary.getSpec().getImages(); - String imageAPI = Optional.ofNullable(imagesSpec.getApi()).orElse(defaultAPIImage); - String imageUI = Optional.ofNullable(imagesSpec.getUi()).orElse(defaultUIImage); + var imagesSpec = Optional.ofNullable(primary.getSpec().getImages()); + String imageAPI = imagesSpec.map(Images::getApi).orElse(defaultAPIImage); + String imageUI = imagesSpec.map(Images::getUi).orElse(defaultUIImage); return desired.edit() .editMetadata() @@ -85,13 +84,10 @@ protected Deployment desired(Console primary, Context context) { .endVolume() .editMatchingContainer(c -> "console-api".equals(c.getName())) .withImage(imageAPI) - .addAllToEnv(primary.getSpec().getEnv()) + .addAllToEnv(coalesce(primary.getSpec().getEnv(), Collections::emptyList)) .endContainer() .editMatchingContainer(c -> "console-ui".equals(c.getName())) .withImage(imageUI) - .editMatchingEnv(env -> "CONSOLE_METRICS_PROMETHEUS_URL".equals(env.getName())) - .withValue(getAttribute(context, PrometheusService.NAME + ".url", String.class)) - .endEnv() .editMatchingEnv(env -> "NEXTAUTH_URL".equals(env.getName())) .withValue(getAttribute(context, ConsoleIngress.NAME + ".url", String.class)) .endEnv() diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleLabelDiscriminator.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleLabelDiscriminator.java index 7f7e85218..9bff7f1f1 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleLabelDiscriminator.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleLabelDiscriminator.java @@ -1,5 +1,9 @@ package com.github.streamshub.console.dependents; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import io.fabric8.kubernetes.api.model.rbac.ClusterRole; public class ConsoleLabelDiscriminator extends BaseLabelDiscriminator { @@ -8,4 +12,10 @@ public ConsoleLabelDiscriminator() { super(ConsoleResource.NAME_LABEL, "console"); } + public ConsoleLabelDiscriminator(Map labels) { + super(Stream.concat( + Stream.of(Map.entry(ConsoleResource.NAME_LABEL, "console")), + labels.entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } } diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleMonitoringClusterRoleBinding.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleMonitoringClusterRoleBinding.java new file mode 100644 index 000000000..9616135b9 --- /dev/null +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleMonitoringClusterRoleBinding.java @@ -0,0 +1,77 @@ +package com.github.streamshub.console.dependents; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import com.github.streamshub.console.api.v1alpha1.Console; +import com.github.streamshub.console.api.v1alpha1.spec.Prometheus.Type; + +import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding; +import io.javaoperatorsdk.operator.api.reconciler.Constants; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; +import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent; +import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition; + +@ApplicationScoped +@KubernetesDependent( + namespaces = Constants.WATCH_ALL_NAMESPACES, + labelSelector = ConsoleResource.MANAGEMENT_SELECTOR, + resourceDiscriminator = ConsoleMonitoringClusterRoleBinding.Discriminator.class) +public class ConsoleMonitoringClusterRoleBinding extends BaseClusterRoleBinding { + + public static final String NAME = "console-monitoring-clusterrolebinding"; + + public static class Discriminator extends ConsoleLabelDiscriminator { + public Discriminator() { + super(Map.of(COMPONENT_LABEL, NAME)); + } + } + + @Inject + ConsoleClusterRole clusterRole; + + @Inject + ConsoleServiceAccount serviceAccount; + + public ConsoleMonitoringClusterRoleBinding() { + super("console", "console-monitoring.clusterrolebinding.yaml", NAME); + } + + @Override + protected String roleName(Console primary) { + // Hard-coded, pre-existing cluster role available in OCP + return "cluster-monitoring-view"; + } + + @Override + protected String subjectName(Console primary) { + return serviceAccount.instanceName(primary); + } + + /** + * The cluster role binding to `cluster-monitoring-view` will only be created + * if one of the metrics sources is OpenShift Monitoring. + */ + public static class Precondition implements Condition { + @Override + public boolean isMet(DependentResource dependentResource, + Console primary, + Context context) { + + var metricsSources = Optional.ofNullable(primary.getSpec().getMetricsSources()) + .orElseGet(Collections::emptyList); + + if (metricsSources.isEmpty()) { + return false; + } + + return metricsSources.stream() + .anyMatch(prometheus -> prometheus.getType() == Type.OPENSHIFT_MONITORING); + } + } +} diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleResource.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleResource.java index 276aa1e73..ba771b13c 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleResource.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleResource.java @@ -10,9 +10,11 @@ import java.util.Comparator; import java.util.HexFormat; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.function.Supplier; import com.github.streamshub.console.api.v1alpha1.Console; @@ -23,6 +25,7 @@ public interface ConsoleResource { static final String MANAGED_BY_LABEL = "app.kubernetes.io/managed-by"; static final String NAME_LABEL = "app.kubernetes.io/name"; + static final String COMPONENT_LABEL = "app.kubernetes.io/component"; static final String INSTANCE_LABEL = "app.kubernetes.io/instance"; static final String MANAGER = "streamshub-console-operator"; @@ -59,9 +62,16 @@ default void setAttribute(Context context, String key, T value) { } default Map commonLabels(String appName) { + return commonLabels(appName, null); + } + + default Map commonLabels(String appName, String componentName) { Map labels = new LinkedHashMap<>(); labels.putAll(MANAGEMENT_LABEL); labels.put(NAME_LABEL, appName); + if (componentName != null) { + labels.put(COMPONENT_LABEL, componentName); + } return labels; } @@ -101,4 +111,10 @@ default String encodeString(String value) { default String decodeString(String encodedValue) { return new String(Base64.getDecoder().decode(encodedValue), StandardCharsets.UTF_8); } + + default List coalesce(List value, Supplier> defaultValue) { + return value != null ? value : defaultValue.get(); + } + + } diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java index fbe2ba3b1..1c0cec1d1 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java @@ -9,14 +9,12 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Random; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -27,20 +25,27 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.github.streamshub.console.ReconciliationException; import com.github.streamshub.console.api.v1alpha1.Console; import com.github.streamshub.console.api.v1alpha1.spec.ConfigVars; import com.github.streamshub.console.api.v1alpha1.spec.Credentials; import com.github.streamshub.console.api.v1alpha1.spec.KafkaCluster; +import com.github.streamshub.console.api.v1alpha1.spec.Prometheus; +import com.github.streamshub.console.api.v1alpha1.spec.Prometheus.Type; import com.github.streamshub.console.api.v1alpha1.spec.SchemaRegistry; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; +import com.github.streamshub.console.config.PrometheusConfig; import com.github.streamshub.console.config.SchemaRegistryConfig; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.api.model.SecretBuilder; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.openshift.api.model.Route; +import io.fabric8.openshift.api.model.RouteIngress; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUDKubernetesDependentResource; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent; @@ -63,6 +68,9 @@ public class ConsoleSecret extends CRUDKubernetesDependentResource context) { var consoleConfig = buildConfig(primary, context); try { - data.put("console-config.yaml", encodeString(objectMapper.writeValueAsString(consoleConfig))); + var yaml = objectMapper.copyWith(new YAMLFactory()); + data.put("console-config.yaml", encodeString(yaml.writeValueAsString(consoleConfig))); } catch (JsonProcessingException e) { throw new UncheckedIOException(e); } @@ -115,19 +124,11 @@ private static String base64String(int length) { return new String(buffer.toByteArray()).substring(0, length); } - private static List coalesce(List value, Supplier> defaultValue) { - return value != null ? value : defaultValue.get(); - } - private ConsoleConfig buildConfig(Console primary, Context context) { ConsoleConfig config = new ConsoleConfig(); - for (SchemaRegistry registry : coalesce(primary.getSpec().getSchemaRegistries(), Collections::emptyList)) { - var registryConfig = new SchemaRegistryConfig(); - registryConfig.setName(registry.getName()); - registryConfig.setUrl(registry.getUrl()); - config.getSchemaRegistries().add(registryConfig); - } + addMetricsSources(primary, config, context); + addSchemaRegistries(primary, config); for (var kafkaRef : primary.getSpec().getKafkaClusters()) { addConfig(primary, context, config, kafkaRef); @@ -136,6 +137,74 @@ private ConsoleConfig buildConfig(Console primary, Context context) { return config; } + private void addMetricsSources(Console primary, ConsoleConfig config, Context context) { + var metricsSources = coalesce(primary.getSpec().getMetricsSources(), Collections::emptyList); + + for (Prometheus prometheus : metricsSources) { + var prometheusConfig = new PrometheusConfig(); + prometheusConfig.setName(prometheus.getName()); + + if (prometheus.getType() == Type.OPENSHIFT_MONITORING) { + prometheusConfig.setType(PrometheusConfig.Type.OPENSHIFT_MONITORING); + prometheusConfig.setUrl(getOpenShiftMonitoringUrl(context)); + } else { + // embedded Prometheus used like standalone by console + prometheusConfig.setType(PrometheusConfig.Type.STANDALONE); + + if (prometheus.getType() == Type.EMBEDDED) { + prometheusConfig.setUrl(prometheusService.getUrl(primary, context)); + } else { + prometheusConfig.setUrl(prometheus.getUrl()); + } + } + + var prometheusAuthn = prometheus.getAuthentication(); + + if (prometheusAuthn instanceof Prometheus.Basic basic) { + var basicConfig = new PrometheusConfig.Basic(); + basicConfig.setUsername(basic.getUsername()); + basicConfig.setPassword(basic.getPassword()); + prometheusConfig.setAuthentication(basicConfig); + } else if (prometheusAuthn instanceof Prometheus.Bearer bearer) { + var bearerConfig = new PrometheusConfig.Bearer(); + bearerConfig.setToken(bearer.getToken()); + prometheusConfig.setAuthentication(bearerConfig); + } + + config.getMetricsSources().add(prometheusConfig); + } + + if (metricsSources.isEmpty()) { + var prometheusConfig = new PrometheusConfig(); + prometheusConfig.setName("embedded-prometheus"); + prometheusConfig.setUrl(prometheusService.getUrl(primary, context)); + config.getMetricsSources().add(prometheusConfig); + } + } + + private String getOpenShiftMonitoringUrl(Context context) { + Route thanosQuerier = getResource(context, Route.class, "openshift-monitoring", "thanos-querier"); + + String host = thanosQuerier.getStatus() + .getIngress() + .stream() + .map(RouteIngress::getHost) + .findFirst() + .orElseThrow(() -> new ReconciliationException( + "Ingress host not found on openshift-monitoring/thanos-querier route")); + + return "https://" + host; + } + + private void addSchemaRegistries(Console primary, ConsoleConfig config) { + for (SchemaRegistry registry : coalesce(primary.getSpec().getSchemaRegistries(), Collections::emptyList)) { + var registryConfig = new SchemaRegistryConfig(); + registryConfig.setName(registry.getName()); + registryConfig.setUrl(registry.getUrl()); + config.getSchemaRegistries().add(registryConfig); + } + } + private void addConfig(Console primary, Context context, ConsoleConfig config, KafkaCluster kafkaRef) { String namespace = kafkaRef.getNamespace(); String name = kafkaRef.getName(); @@ -148,6 +217,14 @@ private void addConfig(Console primary, Context context, ConsoleConfig kcConfig.setListener(listenerName); kcConfig.setSchemaRegistry(kafkaRef.getSchemaRegistry()); + if (kafkaRef.getMetricsSource() == null) { + if (config.getMetricsSources().stream().anyMatch(src -> src.getName().equals("embedded-prometheus"))) { + kcConfig.setMetricsSource("embedded-prometheus"); + } + } else { + kcConfig.setMetricsSource(kafkaRef.getMetricsSource()); + } + config.getKubernetes().setEnabled(Objects.nonNull(namespace)); config.getKafka().getClusters().add(kcConfig); @@ -322,11 +399,18 @@ static T getResource( static T getResource( Context context, Class resourceType, String namespace, String name, boolean optional) { - T resource = context.getClient() - .resources(resourceType) - .inNamespace(namespace) - .withName(name) - .get(); + T resource; + + try { + resource = context.getClient() + .resources(resourceType) + .inNamespace(namespace) + .withName(name) + .get(); + } catch (KubernetesClientException e) { + throw new ReconciliationException("Failed to retrieve %s resource: %s/%s. Message: %s" + .formatted(resourceType.getSimpleName(), namespace, name, e.getMessage())); + } if (resource == null && !optional) { throw new ReconciliationException("No such %s resource: %s/%s".formatted(resourceType.getSimpleName(), namespace, name)); diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/PrometheusPrecondition.java b/operator/src/main/java/com/github/streamshub/console/dependents/PrometheusPrecondition.java new file mode 100644 index 000000000..063ff9ec6 --- /dev/null +++ b/operator/src/main/java/com/github/streamshub/console/dependents/PrometheusPrecondition.java @@ -0,0 +1,28 @@ +package com.github.streamshub.console.dependents; + +import java.util.Collections; +import java.util.Optional; + +import com.github.streamshub.console.api.v1alpha1.Console; +import com.github.streamshub.console.api.v1alpha1.spec.Prometheus.Type; + +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; +import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition; + +public class PrometheusPrecondition implements Condition { + + @Override + public boolean isMet(DependentResource dependentResource, Console primary, Context context) { + var metricsSources = Optional.ofNullable(primary.getSpec().getMetricsSources()) + .orElseGet(Collections::emptyList); + + if (metricsSources.isEmpty()) { + return true; + } + + return metricsSources.stream() + .anyMatch(prometheus -> prometheus.getType() == Type.EMBEDDED); + } + +} diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/PrometheusService.java b/operator/src/main/java/com/github/streamshub/console/dependents/PrometheusService.java index 0bcf9658a..e2416f950 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/PrometheusService.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/PrometheusService.java @@ -29,15 +29,12 @@ protected String appName(Console primary) { return deployment.instanceName(primary); } - @Override - protected Service desired(Console primary, Context context) { + String getUrl(Console primary, Context context) { Service desired = super.desired(primary, context); - setAttribute(context, NAME + ".url", "http://%s.%s.svc.cluster.local:%d".formatted( + return "http://%s.%s.svc.cluster.local:%d".formatted( desired.getMetadata().getName(), desired.getMetadata().getNamespace(), - desired.getSpec().getPorts().get(0).getPort())); - - return desired; + desired.getSpec().getPorts().get(0).getPort()); } } diff --git a/operator/src/main/kubernetes/kubernetes.yml b/operator/src/main/kubernetes/kubernetes.yml index 24062082d..2c22807c4 100644 --- a/operator/src/main/kubernetes/kubernetes.yml +++ b/operator/src/main/kubernetes/kubernetes.yml @@ -30,6 +30,16 @@ rules: # temporary until available: https://github.com/operator-framework/java-operator-sdk/pull/2456 - create + # Used by operator to discover the OpenShift Monitoring query endpoint + - apiGroups: + - route.openshift.io + resources: + - routes + resourceNames: + - thanos-querier + verbs: + - get + # Granted to Prometheus instances - apiGroups: [ '' ] resources: @@ -80,3 +90,16 @@ roleRef: subjects: - kind: ServiceAccount name: console-operator +--- +# Required in order to grant to console instances with OpenShift Cluster Monitoring integration +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: consolereconciler-cluster-monitoring-view +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: cluster-monitoring-view +subjects: + - kind: ServiceAccount + name: console-operator diff --git a/operator/src/main/resources/com/github/streamshub/console/dependents/console-monitoring.clusterrolebinding.yaml b/operator/src/main/resources/com/github/streamshub/console/dependents/console-monitoring.clusterrolebinding.yaml new file mode 100644 index 000000000..4d88052b3 --- /dev/null +++ b/operator/src/main/resources/com/github/streamshub/console/dependents/console-monitoring.clusterrolebinding.yaml @@ -0,0 +1,12 @@ +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: console-server-monitoring +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: cluster-monitoring-view +subjects: + - kind: ServiceAccount + name: console-server + namespace: ${NAMESPACE} diff --git a/operator/src/main/resources/com/github/streamshub/console/dependents/console.deployment.yaml b/operator/src/main/resources/com/github/streamshub/console/dependents/console.deployment.yaml index 570f82296..9c23941d2 100644 --- a/operator/src/main/resources/com/github/streamshub/console/dependents/console.deployment.yaml +++ b/operator/src/main/resources/com/github/streamshub/console/dependents/console.deployment.yaml @@ -13,6 +13,13 @@ spec: spec: serviceAccountName: placeholder volumes: + - name: kubernetes-ca + configMap: + name: kube-root-ca.crt + items: + - key: ca.crt + path: kubernetes-ca.pem + defaultMode: 420 - name: cache emptyDir: {} - name: config @@ -26,10 +33,15 @@ spec: - containerPort: 8080 name: http volumeMounts: + - name: kubernetes-ca + mountPath: /etc/ssl/kubernetes-ca.pem + subPath: kubernetes-ca.pem - name: config mountPath: /deployments/console-config.yaml subPath: console-config.yaml env: + - name: QUARKUS_TLS_TRUST_STORE_PEM_CERTS + value: /etc/ssl/kubernetes-ca.pem - name: CONSOLE_CONFIG_PATH value: /deployments/console-config.yaml startupProbe: diff --git a/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java b/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java index ce720f337..bd9ca3d4f 100644 --- a/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java +++ b/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java @@ -15,6 +15,7 @@ import org.junit.jupiter.api.Test; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.github.streamshub.console.api.v1alpha1.Console; import com.github.streamshub.console.api.v1alpha1.ConsoleBuilder; import com.github.streamshub.console.config.ConsoleConfig; @@ -53,6 +54,7 @@ class ConsoleReconcilerTest { private static final Logger LOGGER = Logger.getLogger(ConsoleReconcilerTest.class); private static final Duration LIMIT = Duration.ofSeconds(1_000); + private static final ObjectMapper YAML = new ObjectMapper(new YAMLFactory()); @Inject KubernetesClient client; @@ -513,7 +515,7 @@ void testConsoleReconciliationWithValidKafkaUser() { assertNotNull(consoleSecret); String configEncoded = consoleSecret.getData().get("console-config.yaml"); byte[] configDecoded = Base64.getDecoder().decode(configEncoded); - ConsoleConfig consoleConfig = new ObjectMapper().readValue(configDecoded, ConsoleConfig.class); + ConsoleConfig consoleConfig = YAML.readValue(configDecoded, ConsoleConfig.class); assertEquals("jaas-config-value", consoleConfig.getKafka().getClusters().get(0).getProperties().get(SaslConfigs.SASL_JAAS_CONFIG)); }); @@ -589,7 +591,7 @@ void testConsoleReconciliationWithKafkaProperties() { assertNotNull(consoleSecret); String configEncoded = consoleSecret.getData().get("console-config.yaml"); byte[] configDecoded = Base64.getDecoder().decode(configEncoded); - ConsoleConfig consoleConfig = new ObjectMapper().readValue(configDecoded, ConsoleConfig.class); + ConsoleConfig consoleConfig = YAML.readValue(configDecoded, ConsoleConfig.class); var kafkaConfig = consoleConfig.getKafka().getClusters().get(0); assertEquals("x-prop-value", kafkaConfig.getProperties().get("x-prop-name")); assertEquals("x-admin-prop-value", kafkaConfig.getAdminProperties().get("x-admin-prop-name")); @@ -638,7 +640,7 @@ void testConsoleReconciliationWithSchemaRegistryUrl() { String configEncoded = consoleSecret.getData().get("console-config.yaml"); byte[] configDecoded = Base64.getDecoder().decode(configEncoded); Logger.getLogger(getClass()).infof("config YAML: %s", new String(configDecoded)); - ConsoleConfig consoleConfig = new ObjectMapper().readValue(configDecoded, ConsoleConfig.class); + ConsoleConfig consoleConfig = YAML.readValue(configDecoded, ConsoleConfig.class); String registryName = consoleConfig.getSchemaRegistries().get(0).getName(); assertEquals("example-registry", registryName); diff --git a/pom.xml b/pom.xml index d8ca83973..99053426e 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,12 @@ java:S110 **/dependents/*.java + + + localhost + streamshub + ${project.version} + false diff --git a/ui/api/kafka/actions.ts b/ui/api/kafka/actions.ts index 58319be8b..560d3a5aa 100644 --- a/ui/api/kafka/actions.ts +++ b/ui/api/kafka/actions.ts @@ -2,29 +2,11 @@ import { getHeaders } from "@/api/api"; import { ClusterDetail, - ClusterKpis, - ClusterKpisSchema, ClusterList, ClusterResponse, ClustersResponseSchema, - MetricRange, - MetricRangeSchema, } from "@/api/kafka/schema"; import { logger } from "@/utils/logger"; -import groupBy from "lodash.groupby"; -import { PrometheusDriver } from "prometheus-query"; -import * as clusterPromql from "./cluster.promql"; -import { values } from "./kpi.promql"; -import * as topicPromql from "./topic.promql"; - -export type ClusterMetric = keyof typeof clusterPromql; -export type TopicMetric = keyof typeof topicPromql; - -const prom = process.env.CONSOLE_METRICS_PROMETHEUS_URL - ? new PrometheusDriver({ - endpoint: process.env.CONSOLE_METRICS_PROMETHEUS_URL, - }) - : undefined; const log = logger.child({ module: "kafka-api" }); @@ -56,10 +38,13 @@ export async function getKafkaClusters(): Promise { export async function getKafkaCluster( clusterId: string, + params?: { + fields?: string; + } ): Promise { const sp = new URLSearchParams({ "fields[kafkas]": - "name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,nodePools,cruiseControlEnabled", + params?.fields ?? "name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,nodePools,cruiseControlEnabled", }); const kafkaClusterQuery = sp.toString(); const url = `${process.env.BACKEND_URL}/api/kafkas/${clusterId}?${kafkaClusterQuery}`; @@ -79,293 +64,6 @@ export async function getKafkaCluster( } } -export async function getKafkaClusterKpis( - clusterId: string, -): Promise<{ cluster: ClusterDetail; kpis: ClusterKpis | null } | null> { - const cluster = await getKafkaCluster(clusterId); - - if (!cluster) { - return null; - } - - if (!prom || !cluster.attributes.namespace) { - log.warn({ clusterId }, "getKafkaClusterKpis: " + - (!cluster.attributes.namespace - ? "Kafka cluster namespace not available" - : "Prometheus not configured or client error")); - return { cluster, kpis: null }; - } - - try { - const valuesRes = await prom.instantQuery( - values( - cluster.attributes.namespace, - cluster.attributes.name, - cluster.attributes.nodePools?.join("|") ?? "", - ), - ); - - log.debug("getKafkaClusterKpis response: " + JSON.stringify(valuesRes)); - - /* - Prometheus returns the data unaggregated. Eg. - - [ - { - "metric": { - "labels": { - "__console_metric_name__": "broker_state", - "nodeId": "2" - } - }, - "value": { - "time": "2023-12-12T16:00:53.381Z", - "value": 3 - } - }, - ... - ] - - We start by flattening the labels, and then group by metric name - */ - const groupedMetrics = groupBy( - valuesRes.result.map((serie) => ({ - metric: serie.metric.labels.__console_metric_name__, - nodeId: serie.metric.labels.nodeId, - time: serie.value.time, - value: serie.value.value, - })), - (v) => v.metric, - ); - - /* - Now we want to transform the data in something easier to work with in the UI. - - Some are totals, in an array form with a single entry; we just need the number. These will look like a metric:value - mapping. - - Some KPIs are provided split by broker id. Of these, some are counts (identified by the string `_count` in the - metric name), and some are other infos. Both will be grouped by nodeId. - The `_count` metrics will have a value with two properties, `byNode` and `total`. `byNode` will hold the grouping. `total` will - have the sum of all the counts. - Other metrics will look like a metric:[node:value] mapping. - - Expected result: - { - "broker_state": { - "0": 3, - "1": 3, - "2": 3 - }, - "replica_count": { - "byNode": { - "0": 57, - "1": 54, - "2": 54 - }, - "total": 165 - }, - "leader_count": { - "byNode": { - "0": 19, - "1": 18, - "2": 18 - }, - "total": 55 - } - } - */ - const kpis = Object.fromEntries( - Object.entries(groupedMetrics).map(([metric, value]) => { - const total = value.reduce((acc, v) => acc + v.value, 0); - if (value.find((v) => v.nodeId)) { - const byNode = Object.fromEntries( - value.map(({ nodeId, value }) => - nodeId ? [nodeId, value] : ["value", value], - ), - ); - return metric.includes("_count") || metric.includes("bytes") - ? [ - metric, - { - byNode, - total, - }, - ] - : [metric, byNode]; - } else { - return [metric, total]; - } - }), - ); - log.debug({ kpis, clusterId }, "getKafkaClusterKpis"); - return { - cluster, - kpis: ClusterKpisSchema.parse(kpis), - }; - } catch (err) { - log.error({ err, clusterId }, "getKafkaClusterKpis"); - return { - cluster, - kpis: null, - }; - } -} - -export async function getKafkaClusterMetrics( - clusterId: string, - metrics: Array, -): Promise<{ - cluster: ClusterDetail; - ranges: Record | null; -} | null> { - async function getRangeByNodeId( - namespace: string, - name: string, - nodePools: string, - metric: ClusterMetric, - ) { - const start = new Date().getTime() - 1 * 60 * 60 * 1000; - const end = new Date(); - const step = 60 * 1; - const seriesRes = await prom!.rangeQuery( - clusterPromql[metric](namespace, name, nodePools), - start, - end, - step, - ); - const serieByNode = Object.fromEntries( - seriesRes.result.map((serie) => [ - serie.metric.labels.nodeId, - Object.fromEntries( - serie.values.map((v: any) => [new Date(v.time).getTime(), v.value]), - ), - ]), - ); - return [metric, MetricRangeSchema.parse(serieByNode)]; - } - - const cluster = await getKafkaCluster(clusterId); - - if (!cluster) { - return null; - } - - if (!prom || !cluster.attributes.namespace) { - log.warn({ clusterId }, "getKafkaClusterMetrics: " + - (!cluster.attributes.namespace - ? "Kafka cluster namespace not available" - : "Prometheus not configured or client error")); - return { cluster, ranges: null }; - } - - try { - const rangesRes = Object.fromEntries( - await Promise.all( - metrics.map((m) => - getRangeByNodeId( - cluster.attributes.namespace!, - cluster.attributes.name, - cluster.attributes.nodePools?.join("|") ?? "", - m, - ), - ), - ), - ); - log.debug( - { ranges: rangesRes, clusterId, metric: metrics }, - "getKafkaClusterMetric", - ); - return { - cluster, - ranges: rangesRes, - }; - } catch (err) { - log.error({ err, clusterId, metric: metrics }, "getKafkaClusterMetric"); - return { - cluster, - ranges: null, - }; - } -} - -export async function getKafkaTopicMetrics( - clusterId: string, - metrics: Array, -): Promise<{ - cluster: ClusterDetail; - ranges: Record | null; -} | null> { - async function getRangeByNodeId( - namespace: string, - name: string, - nodePools: string, - metric: TopicMetric, - ) { - const start = new Date().getTime() - 1 * 60 * 60 * 1000; - const end = new Date(); - const step = 60 * 1; - const seriesRes = await prom!.rangeQuery( - topicPromql[metric](namespace, name, nodePools), - start, - end, - step, - ); - const serieByNode = Object.fromEntries( - seriesRes.result.map((serie) => [ - "all topics", - Object.fromEntries( - serie.values.map((v: any) => [new Date(v.time).getTime(), v.value]), - ), - ]), - ); - return [metric, MetricRangeSchema.parse(serieByNode)]; - } - - const cluster = await getKafkaCluster(clusterId); - - if (!cluster) { - return null; - } - - try { - if (!prom || !cluster.attributes.namespace) { - log.warn({ clusterId }, "getKafkaTopicMetrics: " + - (!cluster.attributes.namespace - ? "Kafka cluster namespace not available" - : "Prometheus not configured or client error")); - return { cluster, ranges: null }; - } - - const rangesRes = Object.fromEntries( - await Promise.all( - metrics.map((m) => - getRangeByNodeId( - cluster.attributes.namespace!, - cluster.attributes.name, - cluster.attributes.nodePools?.join("|") ?? "", - m, - ), - ), - ), - ); - log.debug( - { ranges: rangesRes, clusterId, metric: metrics }, - "getKafkaTopicMetrics", - ); - return { - cluster, - ranges: rangesRes, - }; - } catch (err) { - log.error({ err, clusterId, metric: metrics }, "getKafkaTopicMetrics"); - return { - cluster, - ranges: null, - }; - } -} - export async function updateKafkaCluster( clusterId: string, reconciliationPaused?: boolean, diff --git a/ui/api/kafka/cluster.promql.ts b/ui/api/kafka/cluster.promql.ts deleted file mode 100644 index a03549696..000000000 --- a/ui/api/kafka/cluster.promql.ts +++ /dev/null @@ -1,72 +0,0 @@ -export const cpu = (namespace: string, cluster: string) => ` - sum by (nodeId, __console_metric_name__) ( - label_replace( - label_replace( - rate(container_cpu_usage_seconds_total{namespace="${namespace}",pod=~"${cluster}-.+-\\\\d+",container="kafka"}[1m]), - "nodeId", - "$1", - "pod", - ".+-(\\\\d+)" - ), - "__console_metric_name__", - "cpu_usage_seconds", - "", - "" - ) - ) -`; - -export const memory = (namespace: string, cluster: string) => ` - sum by (nodeId, __console_metric_name__) ( - label_replace( - label_replace( - container_memory_usage_bytes{namespace="${namespace}",pod=~"${cluster}-.+-\\\\d+",container="kafka"}, - "nodeId", - "$1", - "pod", - ".+-(\\\\d+)" - ), - "__console_metric_name__", - "memory_usage_bytes", - "", - "" - ) - ) -`; - -export const volumeCapacity = (namespace: string, cluster: string, nodePools: string) => ` - sum by (nodeId, __console_metric_name__) ( - label_replace( - label_replace( - kubelet_volume_stats_capacity_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-(kafka|${nodePools})-\\\\d+"}, - "nodeId", - "$1", - "persistentvolumeclaim", - ".+-(\\\\d+)" - ), - "__console_metric_name__", - "volume_stats_capacity_bytes", - "", - "" - ) - ) -`; - -export const volumeUsed = (namespace: string, cluster: string, nodePools: string) => ` - sum by (nodeId, __console_metric_name__) ( - label_replace( - label_replace( - kubelet_volume_stats_used_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-(kafka|${nodePools})-\\\\d+"}, - "nodeId", - "$1", - "persistentvolumeclaim", - ".+-(\\\\d+)" - ), - "__console_metric_name__", - "volume_stats_used_bytes", - "", - "" - ) - ) -`; - diff --git a/ui/api/kafka/kpi.promql.ts b/ui/api/kafka/kpi.promql.ts deleted file mode 100644 index b935f1f52..000000000 --- a/ui/api/kafka/kpi.promql.ts +++ /dev/null @@ -1,93 +0,0 @@ -export const values = ( - namespace: string, - cluster: string, - nodePools: string, -) => ` -sum by (__console_metric_name__, nodeId) ( - label_replace( - label_replace( - kafka_server_kafkaserver_brokerstate{namespace="${namespace}",pod=~"${cluster}-.+-\\\\d+",strimzi_io_kind="Kafka"} > 0, - "nodeId", - "$1", - "pod", - ".+-(\\\\d+)" - ), - "__console_metric_name__", - "broker_state", - "", - "" - ) -) - -or - -sum by (__console_metric_name__, nodeId) ( - label_replace( - label_replace( - kafka_server_replicamanager_partitioncount{namespace="${namespace}",pod=~"${cluster}-.+-\\\\d+",strimzi_io_kind="Kafka"} > 0, - "nodeId", - "$1", - "pod", - ".+-(\\\\d+)" - ), - "__console_metric_name__", - "replica_count", - "", - "" - ) -) - -or - -sum by (__console_metric_name__, nodeId) ( - label_replace( - label_replace( - kafka_server_replicamanager_leadercount{namespace="${namespace}",pod=~"${cluster}-.+-\\\\d+",strimzi_io_kind="Kafka"} > 0, - "nodeId", - "$1", - "pod", - ".+-(\\\\d+)" - ), - "__console_metric_name__", - "leader_count", - "", - "" - ) -) - -or - -sum by (__console_metric_name__, nodeId) ( - label_replace( - label_replace( - kubelet_volume_stats_capacity_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-(kafka|${nodePools})-\\\\d+"}, - "nodeId", - "$1", - "persistentvolumeclaim", - ".+-(\\\\d+)" - ), - "__console_metric_name__", - "volume_stats_capacity_bytes", - "", - "" - ) -) - -or - -sum by (__console_metric_name__, nodeId) ( - label_replace( - label_replace( - kubelet_volume_stats_used_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-(kafka|${nodePools})-\\\\d+"}, - "nodeId", - "$1", - "persistentvolumeclaim", - ".+-(\\\\d+)" - ), - "__console_metric_name__", - "volume_stats_used_bytes", - "", - "" - ) -) -`; diff --git a/ui/api/kafka/schema.ts b/ui/api/kafka/schema.ts index c0c0bbe42..5ea596a97 100644 --- a/ui/api/kafka/schema.ts +++ b/ui/api/kafka/schema.ts @@ -80,44 +80,28 @@ const ClusterDetailSchema = z.object({ .nullable() .optional(), nodePools: z.array(z.string()).optional().nullable(), + metrics: z + .object({ + values: z.record( + z.array(z.object({ + value: z.string(), + nodeId: z.string(), + })), + ), + ranges: z.record( + z.array(z.object({ + range: z.array(z.array( + z.string(), + z.string(), + )), + nodeId: z.string().optional(), + })), + ), + }) + .optional(), }), }); export const ClusterResponse = z.object({ data: ClusterDetailSchema, }); export type ClusterDetail = z.infer; - -export const ClusterKpisSchema = z.object({ - broker_state: z.record(z.number()).optional(), - replica_count: z - .object({ - byNode: z.record(z.number()).optional(), - total: z.number().optional(), - }) - .optional(), - leader_count: z - .object({ - byNode: z.record(z.number()).optional(), - total: z.number().optional(), - }) - .optional(), - volume_stats_capacity_bytes: z - .object({ - byNode: z.record(z.number()).optional(), - total: z.number().optional(), - }) - .optional(), - volume_stats_used_bytes: z - .object({ - byNode: z.record(z.number()).optional(), - total: z.number().optional(), - }) - .optional(), -}); -export type ClusterKpis = z.infer; - -export const MetricRangeSchema = z.record( - z.string(), - z.record(z.number()).optional(), -); -export type MetricRange = z.infer; diff --git a/ui/api/kafka/topic.promql.ts b/ui/api/kafka/topic.promql.ts deleted file mode 100644 index 0f883aed1..000000000 --- a/ui/api/kafka/topic.promql.ts +++ /dev/null @@ -1,31 +0,0 @@ -export const incomingByteRate = ( - namespace: string, - cluster: string, - nodePools: string, -) => ` - sum by (__console_metric_name__) ( - label_replace( - irate(kafka_server_brokertopicmetrics_bytesin_total{topic!="",namespace="${namespace}",pod=~"${cluster}-(kafka|${nodePools})-\\\\d+",strimzi_io_kind="Kafka"}[5m]), - "__console_metric_name__", - "incoming_byte_rate", - "", - "" - ) - ) -`; - -export const outgoingByteRate = ( - namespace: string, - cluster: string, - nodePools: string, -) => ` - sum by (__console_metric_name__) ( - label_replace( - irate(kafka_server_brokertopicmetrics_bytesout_total{topic!="",namespace="${namespace}",pod=~"${cluster}-(kafka|${nodePools})-\\\\d+",strimzi_io_kind="Kafka"}[5m]), - "__console_metric_name__", - "outgoing_byte_rate", - "", - "" - ) - ) -`; diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/layout.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/layout.tsx index 132a132ff..75f24c23c 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/layout.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/layout.tsx @@ -1,5 +1,5 @@ import { ClusterLinks } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/ClusterLinks"; -import { getAuthOptions } from "@/app/api/auth/[...nextauth]/route"; +import { getAuthOptions } from "@/app/api/auth/[...nextauth]/auth-options"; import { AppLayout } from "@/components/AppLayout"; import { AppLayoutProvider } from "@/components/AppLayoutProvider"; import { diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/nodes/NodesTable.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/nodes/NodesTable.tsx index aa1b4f23a..75d2523ac 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/nodes/NodesTable.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/nodes/NodesTable.tsx @@ -100,7 +100,7 @@ export function NodesTable({ nodes }: { nodes: Node[] }) { ); case "status": - const isStable = row.status == "Stable"; + const isStable = row.status == "Running"; return ( diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/nodes/page.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/nodes/page.tsx index 3efca94d2..a819c9028 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/nodes/page.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/nodes/page.tsx @@ -1,4 +1,4 @@ -import { getKafkaCluster, getKafkaClusterKpis } from "@/api/kafka/actions"; +import { getKafkaCluster } from "@/api/kafka/actions"; import { KafkaParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/kafka.params"; import { DistributionChart } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/nodes/DistributionChart"; import { @@ -6,17 +6,25 @@ import { NodesTable, } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/nodes/NodesTable"; import { Alert, PageSection } from "@/libs/patternfly/react-core"; -import { redirect } from "@/i18n/routing"; import { getTranslations } from "next-intl/server"; import { Suspense } from "react"; function nodeMetric( - metrics: Record | undefined, + metrics: { value: string, nodeId: string }[] | undefined, nodeId: number, ): number { - return metrics ? (metrics[nodeId.toString()] ?? 0) : 0; + return parseFloat(metrics?.find(e => e.nodeId == nodeId.toString())?.value ?? "0"); } +function nodeRangeMetric( + metrics: { range: string[][], nodeId?: string }[] | undefined, + nodeId: number, +): number { + let range = metrics?.find(e => e.nodeId == nodeId.toString())?.range; + return parseFloat(range?.[range?.length - 1]?.[1] ?? "0"); +} + + export default function NodesPage({ params }: { params: KafkaParams }) { return ( @@ -27,29 +35,60 @@ export default function NodesPage({ params }: { params: KafkaParams }) { async function ConnectedNodes({ params }: { params: KafkaParams }) { const t = await getTranslations(); - const res = await getKafkaClusterKpis(params.kafkaId); + const cluster = await getKafkaCluster(params.kafkaId, { + fields: 'name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,metrics' + }); + const metrics = cluster?.attributes.metrics; - let { cluster, kpis } = res || {}; + const nodes: Node[] = (cluster?.attributes.nodes ?? []).map((node) => { + let brokerState = metrics && nodeMetric(metrics.values?.["broker_state"], node.id); + let status; - const nodes: Node[] = (cluster?.attributes.nodes || []).map((node) => { - const status = kpis - ? nodeMetric(kpis.broker_state, node.id) === 3 - ? "Stable" - : "Unstable" - : "Unknown"; - const leaders = kpis - ? nodeMetric(kpis.leader_count?.byNode, node.id) + /* + * https://github.com/apache/kafka/blob/3.8.0/metadata/src/main/java/org/apache/kafka/metadata/BrokerState.java + */ + switch (brokerState ?? 127) { + case 0: + status = "Not Running"; + break; + case 1: + status = "Starting"; + break; + case 2: + status = "Recovery"; + break; + case 3: + status = "Running"; + break; + case 6: + status = "Pending Controlled Shutdown"; + break; + case 7: + status = "Shutting Down"; + break; + case 127: + default: + status = "Unknown"; + break; + } + + const leaders = metrics + ? nodeMetric(metrics.values?.["leader_count"], node.id) : undefined; + const followers = - kpis && leaders - ? nodeMetric(kpis.replica_count?.byNode, node.id) - leaders + metrics && leaders + ? nodeMetric(metrics.values?.["replica_count"], node.id) - leaders : undefined; - const diskCapacity = kpis - ? nodeMetric(kpis.volume_stats_capacity_bytes?.byNode, node.id) + + const diskCapacity = metrics + ? nodeRangeMetric(metrics.ranges?.["volume_stats_capacity_bytes"], node.id) : undefined; - const diskUsage = kpis - ? nodeMetric(kpis.volume_stats_used_bytes?.byNode, node.id) + + const diskUsage = metrics + ? nodeRangeMetric(metrics.ranges?.["volume_stats_used_bytes"], node.id) : undefined; + return { id: node.id, status, @@ -71,7 +110,7 @@ async function ConnectedNodes({ params }: { params: KafkaParams }) { return ( <> - {!kpis && ( + {!metrics && ( diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterCard.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterCard.tsx index 9f29a655c..8bc078cf8 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterCard.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterCard.tsx @@ -1,46 +1,49 @@ import { ConsumerGroupsResponse } from "@/api/consumerGroups/schema"; -import { ClusterDetail, ClusterKpis } from "@/api/kafka/schema"; +import { ClusterDetail } from "@/api/kafka/schema"; import { ClusterCard } from "@/components/ClusterOverview/ClusterCard"; export async function ConnectedClusterCard({ - data, + cluster, consumerGroups, }: { - data: Promise<{ cluster: ClusterDetail; kpis: ClusterKpis | null } | null>; + cluster: Promise; consumerGroups: Promise; }) { - const res = await data; - if (!res?.kpis) { + const res = await cluster; + + if (!res?.attributes?.metrics) { return ( ); } const groupCount = await consumerGroups.then( (grpResp) => grpResp?.meta.page.total ?? 0, ); - const brokersTotal = Object.keys(res?.kpis.broker_state ?? {}).length; - const brokersOnline = - Object.values(res?.kpis.broker_state ?? {}).filter((s) => s === 3).length || - 0; - const messages = res?.cluster.attributes.conditions + + const brokersTotal = res?.attributes.metrics?.values?.["broker_state"]?.length ?? 0; + const brokersOnline = (res?.attributes.metrics?.values?.["broker_state"] ?? []) + .filter((s) => s.value === "3") + .length; + + const messages = res?.attributes.conditions ?.filter((c) => "Ready" !== c.type) .map((c) => ({ variant: c.type === "Error" ? "danger" : ("warning" as "danger" | "warning"), subject: { type: c.type!, - name: res?.cluster.attributes.name ?? "", - id: res?.cluster.id ?? "", + name: res?.attributes.name ?? "", + id: res?.id ?? "", }, message: c.message ?? "", date: c.lastTransitionTime ?? "", @@ -49,14 +52,14 @@ export async function ConnectedClusterCard({ return ( ); } diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterChartsCard.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterChartsCard.tsx index fc76246ba..9e37c0d7a 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterChartsCard.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterChartsCard.tsx @@ -1,32 +1,35 @@ -import { ClusterMetric } from "@/api/kafka/actions"; -import { ClusterDetail, MetricRange } from "@/api/kafka/schema"; +import { ClusterDetail } from "@/api/kafka/schema"; import { ClusterChartsCard } from "@/components/ClusterOverview/ClusterChartsCard"; function timeSeriesMetrics( - ranges: Record | null | undefined, - rangeName: ClusterMetric, -): TimeSeriesMetrics[] { - return ranges - ? Object.values(ranges[rangeName] ?? {}).map((val) => val ?? {}) - : []; + ranges: Record | undefined, + rangeName: string, +): Record { + const series: Record = {}; + + if (ranges) { + Object.values(ranges[rangeName] ?? {}).forEach((r) => { + series[r.nodeId!] = r.range.reduce((a, v) => ({ ...a, [v[0]]: parseFloat(v[1]) }), {} as TimeSeriesMetrics); + }); + } + + return series; } export async function ConnectedClusterChartsCard({ - data, + cluster, }: { - data: Promise<{ - cluster: ClusterDetail; - ranges: Record | null; - } | null>; + cluster: Promise; }) { - const res = await data; + const res = await cluster; + return ( ); } diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedTopicChartsCard.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedTopicChartsCard.tsx index aa3b412c0..323c253df 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedTopicChartsCard.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedTopicChartsCard.tsx @@ -1,21 +1,33 @@ -import { TopicMetric } from "@/api/kafka/actions"; -import { ClusterDetail, MetricRange } from "@/api/kafka/schema"; +import { ClusterDetail } from "@/api/kafka/schema"; import { TopicChartsCard } from "@/components/ClusterOverview/TopicChartsCard"; +function timeSeriesMetrics( + ranges: Record | undefined, + rangeName: string, +): TimeSeriesMetrics { + let series: TimeSeriesMetrics = {}; + + if (ranges) { + Object.values(ranges[rangeName] ?? {}).forEach((r) => { + series = r.range.reduce((a, v) => ({ ...a, [v[0]]: parseFloat(v[1]) }), series); + }); + } + + return series; +} + export async function ConnectedTopicChartsCard({ - data, + cluster, }: { - data: Promise<{ - cluster: ClusterDetail; - ranges: Record | null; - } | null>; + cluster: Promise; }) { - const res = await data; + const res = await cluster; + return ( ); } diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/page.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/page.tsx index 6054ce526..a591bd6b9 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/page.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/page.tsx @@ -1,9 +1,5 @@ import { getConsumerGroups } from "@/api/consumerGroups/actions"; -import { - getKafkaClusterKpis, - getKafkaClusterMetrics, - getKafkaTopicMetrics, -} from "@/api/kafka/actions"; +import { getKafkaCluster } from "@/api/kafka/actions"; import { getTopics, getViewedTopics } from "@/api/topics/actions"; import { KafkaParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/kafka.params"; import { ConnectedClusterCard } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterCard"; @@ -14,17 +10,9 @@ import { PageLayout } from "@/components/ClusterOverview/PageLayout"; import { ConnectedRecentTopics } from "./ConnectedRecentTopics"; export default function OverviewPage({ params }: { params: KafkaParams }) { - const kpi = getKafkaClusterKpis(params.kafkaId); - const cluster = getKafkaClusterMetrics(params.kafkaId, [ - "volumeUsed", - "volumeCapacity", - "memory", - "cpu", - ]); - const topic = getKafkaTopicMetrics(params.kafkaId, [ - "outgoingByteRate", - "incomingByteRate", - ]); + const kafkaCluster = getKafkaCluster(params.kafkaId, { + fields: 'name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,metrics' + }); const topics = getTopics(params.kafkaId, { fields: "status", pageSize: 1 }); const consumerGroups = getConsumerGroups(params.kafkaId, { fields: "state" }); const viewedTopics = getViewedTopics().then((topics) => @@ -34,11 +22,11 @@ export default function OverviewPage({ params }: { params: KafkaParams }) { return ( + } topicsPartitions={} - clusterCharts={} - topicCharts={} + clusterCharts={} + topicCharts={} recentTopics={} /> ); diff --git a/ui/app/[locale]/(authorized)/layout.tsx b/ui/app/[locale]/(authorized)/layout.tsx index a7d7e232c..b9a1f5cfc 100644 --- a/ui/app/[locale]/(authorized)/layout.tsx +++ b/ui/app/[locale]/(authorized)/layout.tsx @@ -1,4 +1,4 @@ -import { getAuthOptions } from "@/app/api/auth/[...nextauth]/route"; +import { getAuthOptions } from "@/app/api/auth/[...nextauth]/auth-options"; import { getServerSession } from "next-auth"; import { ReactNode } from "react"; diff --git a/ui/app/api/auth/[...nextauth]/auth-options.ts b/ui/app/api/auth/[...nextauth]/auth-options.ts new file mode 100644 index 000000000..5c70fcf43 --- /dev/null +++ b/ui/app/api/auth/[...nextauth]/auth-options.ts @@ -0,0 +1,48 @@ +import { getKafkaClusters } from "@/api/kafka/actions"; +import { ClusterList } from "@/api/kafka/schema"; +import { logger } from "@/utils/logger"; +import { AuthOptions } from "next-auth"; +import { Provider } from "next-auth/providers/index"; +import { makeAnonymous } from "./anonymous"; +import { makeOauthTokenProvider } from "./oauth-token"; +import { makeScramShaProvider } from "./scram"; + +const log = logger.child({ module: "auth" }); + +function makeAuthOption(cluster: ClusterList): Provider { + switch (cluster.meta.authentication?.method) { + case "oauth": { + const { tokenUrl } = cluster.meta.authentication; + return makeOauthTokenProvider(tokenUrl ?? "TODO"); + } + case "basic": + return makeScramShaProvider(cluster.id); + case "anonymous": + default: + return makeAnonymous(); + } +} + +export async function getAuthOptions(): Promise { + // retrieve the authentication method required by the default Kafka cluster + const clusters = await getKafkaClusters(); + const providers = clusters.map(makeAuthOption); + log.trace({ providers }, "getAuthOptions"); + return { + providers, + callbacks: { + async jwt({ token, user }) { + if (user) { + token.authorization = user.authorization; + } + return token; + }, + async session({ session, token, user }) { + // Send properties to the client, like an access_token and user id from a provider. + session.authorization = token.authorization; + + return session; + }, + }, + }; +} \ No newline at end of file diff --git a/ui/app/api/auth/[...nextauth]/route.ts b/ui/app/api/auth/[...nextauth]/route.ts index 812db7da4..731ad900c 100644 --- a/ui/app/api/auth/[...nextauth]/route.ts +++ b/ui/app/api/auth/[...nextauth]/route.ts @@ -1,54 +1,7 @@ -import { getKafkaClusters } from "@/api/kafka/actions"; -import { ClusterList } from "@/api/kafka/schema"; -import { logger } from "@/utils/logger"; -import NextAuth, { AuthOptions } from "next-auth"; -import { Provider } from "next-auth/providers/index"; +import NextAuth from "next-auth"; import { NextRequest, NextResponse } from "next/server"; -import { makeAnonymous } from "./anonymous"; -import { makeOauthTokenProvider } from "./oauth-token"; -import { makeScramShaProvider } from "./scram"; +import { getAuthOptions } from "./auth-options"; -const log = logger.child({ module: "auth" }); - -export async function getAuthOptions(): Promise { - // retrieve the authentication method required by the default Kafka cluster - const clusters = await getKafkaClusters(); - const providers = clusters.map(makeAuthOption); - log.trace({ providers }, "getAuthOptions"); - return { - providers, - callbacks: { - async jwt({ token, user }) { - if (user) { - token.authorization = user.authorization; - } - return token; - }, - async session({ session, token, user }) { - // Send properties to the client, like an access_token and user id from a provider. - session.authorization = token.authorization; - - return session; - }, - }, - }; -} - -function makeAuthOption(cluster: ClusterList): Provider { - switch (cluster.meta.authentication?.method) { - case "oauth": { - const { tokenUrl } = cluster.meta.authentication; - return makeOauthTokenProvider(tokenUrl ?? "TODO"); - } - case "basic": - return makeScramShaProvider(cluster.id); - case "anonymous": - default: - return makeAnonymous(); - } -} - -// const handler = NextAuth(authOptions); async function handler(req: NextRequest, res: NextResponse) { const authOptions = await getAuthOptions(); if (authOptions) { diff --git a/ui/components/ClusterOverview/ClusterChartsCard.tsx b/ui/components/ClusterOverview/ClusterChartsCard.tsx index c09e6d91b..aa4a799d5 100644 --- a/ui/components/ClusterOverview/ClusterChartsCard.tsx +++ b/ui/components/ClusterOverview/ClusterChartsCard.tsx @@ -17,10 +17,10 @@ import { HelpIcon } from "@/libs/patternfly/react-icons"; import { useTranslations } from "next-intl"; type ClusterChartsCardProps = { - usedDiskSpace: TimeSeriesMetrics[]; - availableDiskSpace: TimeSeriesMetrics[]; - memoryUsage: TimeSeriesMetrics[]; - cpuUsage: TimeSeriesMetrics[]; + usedDiskSpace: Record; + availableDiskSpace: Record; + memoryUsage: Record; + cpuUsage: Record; }; export function ClusterChartsCard({ diff --git a/ui/components/ClusterOverview/TopicChartsCard.tsx b/ui/components/ClusterOverview/TopicChartsCard.tsx index b33303584..a6d1c2ca7 100644 --- a/ui/components/ClusterOverview/TopicChartsCard.tsx +++ b/ui/components/ClusterOverview/TopicChartsCard.tsx @@ -1,5 +1,4 @@ "use client"; -import { MetricRange } from "@/api/kafka/schema"; import { Card, CardBody, @@ -15,8 +14,8 @@ import { ChartSkeletonLoader } from "./components/ChartSkeletonLoader"; import { useTranslations } from "next-intl"; type TopicChartsCardProps = { - incoming: MetricRange; - outgoing: MetricRange; + incoming: TimeSeriesMetrics; + outgoing: TimeSeriesMetrics; }; export function TopicChartsCard({ diff --git a/ui/components/ClusterOverview/components/ChartCpuUsage.tsx b/ui/components/ClusterOverview/components/ChartCpuUsage.tsx index 32eb9520b..e6ca7618e 100644 --- a/ui/components/ClusterOverview/components/ChartCpuUsage.tsx +++ b/ui/components/ClusterOverview/components/ChartCpuUsage.tsx @@ -15,7 +15,7 @@ import { getHeight, getPadding } from "./chartConsts"; import { useChartWidth } from "./useChartWidth"; type ChartCpuUsageProps = { - usages: TimeSeriesMetrics[]; + usages: Record; }; type Datum = { @@ -29,7 +29,15 @@ export function ChartCpuUsage({ usages }: ChartCpuUsageProps) { const format = useFormatter(); const [containerRef, width] = useChartWidth(); - const itemsPerRow = width > 650 ? 6 : width > 300 ? 3 : 1; + let itemsPerRow; + + if (width > 650) { + itemsPerRow = 6; + } else if (width > 300) { + itemsPerRow = 3; + } else { + itemsPerRow = 1; + } const hasMetrics = Object.keys(usages).length > 0; if (!hasMetrics) { @@ -42,11 +50,11 @@ export function ChartCpuUsage({ usages }: ChartCpuUsageProps) { /> ); } - // const showDate = shouldShowDate(duration); + const CursorVoronoiContainer = createContainer("voronoi", "cursor"); - const legendData = usages.map((_, idx) => ({ - name: `Node ${idx}`, - childName: `node ${idx}`, + const legendData = Object.keys(usages).map((nodeId) => ({ + name: `Node ${nodeId}`, + childName: `node ${nodeId}`, })); const padding = getPadding(legendData.length / itemsPerRow); return ( @@ -112,17 +120,18 @@ export function ChartCpuUsage({ usages }: ChartCpuUsageProps) { }} /> - {usages.map((usage, idx) => { - const usageArray = Object.entries(usage); + {Object.entries(usages).map(([nodeId, series]) => { return ( ({ - name: `Node ${idx + 1}`, - x, - y, - }))} - name={`node ${idx}`} + key={ `cpu-usage-${nodeId}` } + data={ Object.entries(series).map(([k, v]) => { + return ({ + name: `Node ${nodeId}`, + x: Date.parse(k), + y: v, + }) + })} + name={ `node ${nodeId}` } /> ); })} diff --git a/ui/components/ClusterOverview/components/ChartDiskUsage.tsx b/ui/components/ClusterOverview/components/ChartDiskUsage.tsx index 831f142cd..40646e084 100644 --- a/ui/components/ClusterOverview/components/ChartDiskUsage.tsx +++ b/ui/components/ClusterOverview/components/ChartDiskUsage.tsx @@ -17,8 +17,8 @@ import { getHeight, getPadding } from "./chartConsts"; import { useChartWidth } from "./useChartWidth"; type ChartDiskUsageProps = { - usages: TimeSeriesMetrics[]; - available: TimeSeriesMetrics[]; + usages: Record; + available: Record; }; type Datum = { x: number; @@ -46,17 +46,23 @@ export function ChartDiskUsage({ usages, available }: ChartDiskUsageProps) { ); } const CursorVoronoiContainer = createContainer("voronoi", "cursor"); - const legendData = [ - ...usages.map((_, idx) => ({ - name: `Node ${idx}`, - childName: `node ${idx}`, - })), - ...usages.map((_, idx) => ({ - name: `Available storage threshold (node ${idx})`, - childName: `threshold ${idx}`, + const legendData: { name: string, childName: string, symbol?: { type: string } }[] = []; + + Object.entries(usages).forEach(([nodeId, _]) => { + legendData.push({ + name: `Node ${nodeId}`, + childName: `node ${nodeId}`, + }); + }); + + Object.entries(usages).forEach(([nodeId, _]) => { + legendData.push({ + name: `Available storage threshold (node ${nodeId})`, + childName: `threshold ${nodeId}`, symbol: { type: "threshold" }, - })), - ]; + }); + }); + const padding = getPadding(legendData.length / itemsPerRow); return (
@@ -117,36 +123,38 @@ export function ChartDiskUsage({ usages, available }: ChartDiskUsageProps) { dependentAxis showGrid={true} tickFormat={(d) => { - return formatBytes(d, { maximumFractionDigits: 0 }); + return formatBytes(d); }} /> - {usages.map((usage, idx) => { - const usageArray = Object.entries(usage); + {Object.entries(usages).map(([nodeId, series]) => { return ( ({ - name: `Node ${idx + 1}`, - x, - y, - }))} - name={`node ${idx}`} + key={ `usage-area-${nodeId}` } + data={ Object.entries(series).map(([k, v]) => { + return ({ + name: `Node ${nodeId}`, + x: Date.parse(k), + y: v, + }) + })} + name={ `node ${nodeId}` } /> ); })} - {usages.map((usage, idx) => { - const usageArray = Object.entries(usage); - const data = Object.entries(available[idx]); + + {Object.entries(usages).map(([nodeId, _]) => { + const availableSeries = available[nodeId]; + return ( ({ - name: `Available storage threshold (node ${idx + 1})`, - x: usageArray[x][0], - y, + key={ `chart-softlimit-${nodeId}` } + data={ Object.entries(availableSeries).map(([k, v]) => ({ + name: `Available storage threshold (node ${nodeId})`, + x: Date.parse(k), + y: v, }))} - name={`threshold ${idx}`} + name={`threshold ${nodeId}`} /> ); })} diff --git a/ui/components/ClusterOverview/components/ChartIncomingOutgoing.tsx b/ui/components/ClusterOverview/components/ChartIncomingOutgoing.tsx index aec906093..4097cad78 100644 --- a/ui/components/ClusterOverview/components/ChartIncomingOutgoing.tsx +++ b/ui/components/ClusterOverview/components/ChartIncomingOutgoing.tsx @@ -1,6 +1,7 @@ "use client"; import { Chart, + ChartArea, ChartAxis, ChartGroup, ChartLegend, @@ -9,15 +10,14 @@ import { createContainer, } from "@/libs/patternfly/react-charts"; import { useFormatBytes } from "@/utils/useFormatBytes"; -import { ChartArea } from "@/libs/patternfly/react-charts"; import { Alert } from "@patternfly/react-core"; import { useFormatter, useTranslations } from "next-intl"; import { getHeight, getPadding } from "./chartConsts"; import { useChartWidth } from "./useChartWidth"; type ChartIncomingOutgoingProps = { - incoming: Record; - outgoing: Record; + incoming: TimeSeriesMetrics; + outgoing: TimeSeriesMetrics; }; type Datum = { @@ -52,16 +52,14 @@ export function ChartIncomingOutgoing({ } // const showDate = shouldShowDate(duration); const CursorVoronoiContainer = createContainer("voronoi", "cursor"); - const legendData = [ - ...Object.keys(incoming).map((name) => ({ - name: `Incoming bytes (${name})`, - childName: `incoming ${name}`, - })), - ...Object.keys(outgoing).map((name) => ({ - name: `Outgoing bytes (${name})`, - childName: `outgoing ${name}`, - })), - ]; + const legendData = [ { + name: "Incoming bytes (all topics)", + childName: "incoming" + }, { + name: "Outgoing bytes (all topics)", + childName: "outgoing" + } ]; + const padding = getPadding(legendData.length / itemsPerRow); return (
@@ -125,43 +123,36 @@ export function ChartIncomingOutgoing({ { - return formatBytes(Math.abs(d), { maximumFractionDigits: 0 }); + return formatBytes(Math.abs(d)); }} /> - {Object.entries(incoming).map(([name, entries], idx) => { - const entriesArray = Object.entries(entries ?? {}); - return ( - ({ - name: `Incoming (${name})`, - x, - y, - value: y, - }))} - name={`incoming ${name}`} - interpolation={"stepAfter"} - /> - ); - })} - {Object.entries(outgoing).map(([name, entries], idx) => { - const entriesArray = Object.entries(entries ?? {}); - const incomingArray = Object.keys(incoming[name] ?? {}); - return ( - ({ - name: `Outgoing (${name})`, - x: incomingArray[idx], - y: -1 * y, - value: y, - }))} - name={`outgoing ${name}`} - interpolation={"stepAfter"} - /> - ); - })} + { + return ({ + name: `Incoming`, + x: Date.parse(k), + y: v, + value: v, + }) + })} + name={ `incoming` } + interpolation={"stepAfter"} + /> + { + return ({ + name: `Outgoing`, + x: Date.parse(k), + y: v * -1, + value: v, + }) + })} + name={ `outgoing` } + interpolation={"stepAfter"} + />
diff --git a/ui/components/ClusterOverview/components/ChartMemoryUsage.tsx b/ui/components/ClusterOverview/components/ChartMemoryUsage.tsx index fe741647b..77024f7a0 100644 --- a/ui/components/ClusterOverview/components/ChartMemoryUsage.tsx +++ b/ui/components/ClusterOverview/components/ChartMemoryUsage.tsx @@ -16,7 +16,7 @@ import { getHeight, getPadding } from "./chartConsts"; import { useChartWidth } from "./useChartWidth"; type ChartDiskUsageProps = { - usages: TimeSeriesMetrics[]; + usages: Record; }; type Datum = { @@ -46,11 +46,12 @@ export function ChartMemoryUsage({ usages }: ChartDiskUsageProps) { } const CursorVoronoiContainer = createContainer("voronoi", "cursor"); - const legendData = usages.map((_, idx) => ({ - name: `Node ${idx}`, - childName: `node ${idx}`, + const legendData = Object.keys(usages).map((nodeId) => ({ + name: `Node ${nodeId}`, + childName: `node ${nodeId}`, })); const padding = getPadding(legendData.length / itemsPerRow); + return (
{ - return formatBytes(d, { maximumFractionDigits: 0 }); + return formatBytes(d); }} /> - {usages.map((usage, idx) => { - const usageArray = Object.entries(usage); + {Object.entries(usages).map(([nodeId, series]) => { return ( ({ - name: `Node ${idx + 1}`, - x, - y, - }))} - name={`node ${idx}`} + key={ `memory-usage-${nodeId}` } + data={ Object.entries(series).map(([k, v]) => { + return ({ + name: `Node ${nodeId}`, + x: Date.parse(k), + y: v, + }) + })} + name={ `node ${nodeId}` } /> ); })} diff --git a/ui/components/ClusterOverview/components/chartConsts.ts b/ui/components/ClusterOverview/components/chartConsts.ts index 98d7e9891..0998a0afa 100644 --- a/ui/components/ClusterOverview/components/chartConsts.ts +++ b/ui/components/ClusterOverview/components/chartConsts.ts @@ -3,7 +3,7 @@ export const getHeight = (legendEntriesCount: number) => { return 150 + bottom; }; export const getPadding = (legendEntriesCount: number) => ({ - bottom: 35 + 32 * legendEntriesCount, + bottom: 50 + 32 * legendEntriesCount, top: 5, left: 70, right: 30, diff --git a/ui/environment.d.ts b/ui/environment.d.ts index 06950d380..af85ca3a9 100644 --- a/ui/environment.d.ts +++ b/ui/environment.d.ts @@ -7,7 +7,6 @@ namespace NodeJS { KEYCLOAK_CLIENTSECRET?: string; NEXT_PUBLIC_KEYCLOAK_URL?: string; NEXT_PUBLIC_PRODUCTIZED_BUILD?: "true" | "false"; - CONSOLE_METRICS_PROMETHEUS_URL?: string; LOG_LEVEL?: "fatal" | "error" | "warn" | "info" | "debug" | "trace"; CONSOLE_MODE?: "read-only" | "read-write"; } diff --git a/ui/package.json b/ui/package.json index aeccdf347..2138ae9be 100644 --- a/ui/package.json +++ b/ui/package.json @@ -26,7 +26,6 @@ "@stdlib/string-truncate": "^0.2.2", "@stdlib/string-truncate-middle": "^0.2.2", "@tanstack/react-virtual": "^3.10.8", - "@types/lodash.groupby": "^4.6.9", "@types/node": "22.8.2", "@types/react": "18.3.12", "@types/react-dom": "18.3.1", @@ -41,13 +40,11 @@ "eslint-import-resolver-typescript": "^3.6.3", "eslint-plugin-storybook": "^0.10.1", "iron-session": "^8.0.3", - "lodash.groupby": "^4.6.0", "next": "^14.2.15", "next-auth": "^4.24.10", "next-intl": "^3.23.5", "next-logger": "^5.0.1", "pino": "^9.5.0", - "prometheus-query": "^3.4.0", "react": "18.3.1", "react-csv-downloader": "^3.1.1", "react-dom": "18.3.1", diff --git a/ui/utils/session.ts b/ui/utils/session.ts index c185cf7bf..3268b3022 100644 --- a/ui/utils/session.ts +++ b/ui/utils/session.ts @@ -1,6 +1,6 @@ "use server"; -import { getAuthOptions } from "@/app/api/auth/[...nextauth]/route"; +import { getAuthOptions } from "@/app/api/auth/[...nextauth]/auth-options"; import { logger } from "@/utils/logger"; import { sealData, unsealData } from "iron-session"; import { getServerSession } from "next-auth"; diff --git a/ui/utils/useFormatBytes.ts b/ui/utils/useFormatBytes.ts index 04e90a273..be6f97548 100644 --- a/ui/utils/useFormatBytes.ts +++ b/ui/utils/useFormatBytes.ts @@ -11,8 +11,34 @@ export function useFormatBytes() { return "0 B"; } const res = convert(bytes, "bytes").to("best", "imperial"); + let minimumFractionDigits = undefined; + + if (maximumFractionDigits === undefined) { + switch (res.unit) { + case "PiB": + case "TiB": + case "GiB": + case "MiB": + case "KiB": + if (res.quantity >= 100) { + maximumFractionDigits = 0; + } else if (res.quantity >= 10) { + minimumFractionDigits = 1; + maximumFractionDigits = 1; + } else { + minimumFractionDigits = 2; + maximumFractionDigits = 2; + } + break; + default: + maximumFractionDigits = 0; + break; + } + } + return `${format.number(res.quantity, { style: "decimal", + minimumFractionDigits, maximumFractionDigits, })} ${res.unit}`; };