diff --git a/api/pom.xml b/api/pom.xml index 2298d1bb5..f1ad50e6c 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -84,6 +84,10 @@ io.quarkus quarkus-kubernetes-client + + io.quarkus + quarkus-oidc + io.smallrye.common smallrye-common-annotation diff --git a/api/src/main/java/com/github/streamshub/console/api/BrokersResource.java b/api/src/main/java/com/github/streamshub/console/api/BrokersResource.java index 7109aba39..22f794fd6 100644 --- a/api/src/main/java/com/github/streamshub/console/api/BrokersResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/BrokersResource.java @@ -16,7 +16,10 @@ import org.eclipse.microprofile.openapi.annotations.tags.Tag; import com.github.streamshub.console.api.model.ConfigEntry; +import com.github.streamshub.console.api.security.Authorized; +import com.github.streamshub.console.api.security.ResourcePrivilege; import com.github.streamshub.console.api.service.BrokerService; +import com.github.streamshub.console.config.security.Privilege; @Path("/api/kafkas/{clusterId}/nodes") @Tag(name = "Kafka Cluster Resources") @@ -32,6 +35,8 @@ public class BrokersResource { @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") + @Authorized + @ResourcePrivilege(action = Privilege.GET) public CompletionStage describeConfigs( @Parameter(description = "Cluster identifier") @PathParam("clusterId") @@ -41,7 +46,7 @@ public CompletionStage describeConfigs( @Parameter(description = "Node identifier") String nodeId) { - return brokerService.describeConfigs(nodeId) + return brokerService.describeConfigs(clusterId, nodeId) .thenApply(ConfigEntry.ConfigResponse::new) .thenApply(Response::ok) .thenApply(Response.ResponseBuilder::build); diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index b789ad453..9ffa94f61 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -4,7 +4,6 @@ import java.io.InputStream; import java.io.UncheckedIOException; import java.nio.file.Path; -import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -51,8 +50,6 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; -import org.apache.kafka.common.security.plain.PlainLoginModule; -import org.apache.kafka.common.security.scram.ScramLoginModule; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.config.Config; @@ -61,6 +58,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.github.streamshub.console.api.security.SaslJaasConfigCredential; import com.github.streamshub.console.api.service.KafkaClusterService; import com.github.streamshub.console.api.support.Holder; import com.github.streamshub.console.api.support.KafkaContext; @@ -72,6 +70,7 @@ import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.quarkus.security.identity.SecurityIdentity; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec; import io.strimzi.api.kafka.model.kafka.KafkaSpec; @@ -101,20 +100,11 @@ public class ClientFactory { public static final String SCRAM_SHA256 = "SCRAM-SHA-256"; public static final String SCRAM_SHA512 = "SCRAM-SHA-512"; - private static final String BEARER = "Bearer "; private static final String STRIMZI_OAUTH_CALLBACK = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"; - private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName() - + " required" - + " oauth.access.token=\"%s\" ;"; - private static final String BASIC = "Basic "; - private static final String BASIC_TEMPLATE = "%s required username=\"%%s\" password=\"%%s\" ;"; - private static final String SASL_PLAIN_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(PlainLoginModule.class.getName()); - private static final String SASL_SCRAM_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(ScramLoginModule.class.getName()); - - static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured"; + public static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured"; private final Function noSuchKafka = - clusterName -> new NotFoundException(NO_SUCH_KAFKA_MESSAGE.formatted(clusterName)); + clusterId -> new NotFoundException(NO_SUCH_KAFKA_MESSAGE.formatted(clusterId)); @Inject Logger log; @@ -278,7 +268,7 @@ public void onDelete(Kafka kafka, boolean deletedFinalStateUnknown) { findConfig(kafka).ifPresentOrElse( clusterConfig -> { String clusterKey = clusterConfig.clusterKey(); - String clusterId = clusterId(clusterConfig, Optional.of(kafka)); + String clusterId = KafkaContext.clusterId(clusterConfig, Optional.of(kafka)); log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId); log.debugf("Known KafkaContext identifiers: %s", contexts.keySet()); KafkaContext previous = contexts.remove(clusterId); @@ -332,7 +322,7 @@ void putKafkaContext(Map contexts, clientConfigs.put(Producer.class, Collections.unmodifiableMap(producerConfigs)); String clusterKey = clusterConfig.clusterKey(); - String clusterId = clusterId(clusterConfig, kafkaResource); + String clusterId = KafkaContext.clusterId(clusterConfig, kafkaResource); if (contexts.containsKey(clusterId) && !allowReplacement) { log.warnf(""" @@ -362,12 +352,6 @@ boolean defaultedClusterId(KafkaClusterConfig clusterConfig, Optional kaf return clusterConfig.getId() == null && kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId).isEmpty(); } - String clusterId(KafkaClusterConfig clusterConfig, Optional kafkaResource) { - return Optional.ofNullable(clusterConfig.getId()) - .or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId)) - .orElseGet(clusterConfig::getName); - } - Optional cachedKafkaResource(KafkaClusterConfig clusterConfig) { return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore) .map(store -> store.getByKey(clusterConfig.clusterKey())) @@ -511,6 +495,7 @@ void disposeKafkaContexts(@Disposes Map contexts) { @Produces @RequestScoped public KafkaContext produceKafkaContext(Map contexts, + SecurityIdentity identity, UnaryOperator filter, Function, Admin> adminBuilder) { @@ -520,22 +505,28 @@ public KafkaContext produceKafkaContext(Map contexts, return KafkaContext.EMPTY; } - return Optional.ofNullable(contexts.get(clusterId)) - .map(ctx -> { - if (ctx.admin() == null) { - /* - * Admin may be null if credentials were not given in the - * configuration. The user must provide the login secrets - * in the request in that case. - */ - var adminConfigs = maybeAuthenticate(ctx, Admin.class); - var admin = adminBuilder.apply(adminConfigs); - return new KafkaContext(ctx, filter.apply(admin)); - } + KafkaContext ctx = contexts.get(clusterId); - return ctx; - }) - .orElseThrow(() -> noSuchKafka.apply(clusterId)); + if (ctx == null) { + throw noSuchKafka.apply(clusterId); + } + + if (identity.isAnonymous()) { + return ctx; + } + + if (ctx.admin() == null) { + /* + * Admin may be null if credentials were not given in the + * configuration. The user must provide the login secrets + * in the request in that case. + */ + var adminConfigs = maybeAuthenticate(identity, ctx, Admin.class); + var admin = adminBuilder.apply(adminConfigs); + return new KafkaContext(ctx, filter.apply(admin)); + } + + return ctx; } public void disposeKafkaContext(@Disposes KafkaContext context, Map contexts) { @@ -552,8 +543,8 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { - var configs = maybeAuthenticate(context, Consumer.class); + public Supplier> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context, SecurityIdentity identity) { + var configs = maybeAuthenticate(identity, context, Consumer.class); Consumer client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer return () -> client; } @@ -564,8 +555,8 @@ public void consumerDisposer(@Disposes Supplier> consum @Produces @RequestScoped - public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { - var configs = maybeAuthenticate(context, Producer.class); + public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context, SecurityIdentity identity) { + var configs = maybeAuthenticate(identity, context, Producer.class); Producer client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer return () -> client; } @@ -574,13 +565,13 @@ public void producerDisposer(@Disposes Supplier> produc producer.get().close(); } - Map maybeAuthenticate(KafkaContext context, Class clientType) { + Map maybeAuthenticate(SecurityIdentity identity, KafkaContext context, Class clientType) { Map configs = context.configs(clientType); if (configs.containsKey(SaslConfigs.SASL_MECHANISM) && !configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { configs = new HashMap<>(configs); - configureAuthentication(context.saslMechanism(clientType), configs); + configureAuthentication(identity, context.saslMechanism(clientType), configs); } return configs; @@ -756,23 +747,25 @@ void logConfig(String clientType, Map config) { } } - void configureAuthentication(String saslMechanism, Map configs) { + void configureAuthentication(SecurityIdentity identity, String saslMechanism, Map configs) { + SaslJaasConfigCredential credential = identity.getCredential(SaslJaasConfigCredential.class); + switch (saslMechanism) { case OAUTHBEARER: - configureOAuthBearer(configs); + configureOAuthBearer(credential, configs); break; case PLAIN: - configureBasic(configs, SASL_PLAIN_CONFIG_TEMPLATE); + configureBasic(credential, configs); break; case SCRAM_SHA256, SCRAM_SHA512: - configureBasic(configs, SASL_SCRAM_CONFIG_TEMPLATE); + configureBasic(credential, configs); break; default: throw new NotAuthorizedException("Unknown"); } } - void configureOAuthBearer(Map configs) { + void configureOAuthBearer(SaslJaasConfigCredential credential, Map configs) { log.trace("SASL/OAUTHBEARER enabled"); configs.putIfAbsent(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, STRIMZI_OAUTH_CALLBACK); @@ -780,39 +773,12 @@ void configureOAuthBearer(Map configs) { // May still cause warnings to be logged when token will expire in less than SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS. configs.putIfAbsent(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, "0"); - String jaasConfig = getAuthorization(BEARER) - .map(SASL_OAUTH_CONFIG_TEMPLATE::formatted) - .orElseThrow(() -> new NotAuthorizedException(BEARER.trim())); - - configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); + configs.put(SaslConfigs.SASL_JAAS_CONFIG, credential.value()); } - void configureBasic(Map configs, String template) { + void configureBasic(SaslJaasConfigCredential credential, Map configs) { log.trace("SASL/SCRAM enabled"); - - String jaasConfig = getBasicAuthentication() - .map(template::formatted) - .orElseThrow(() -> new NotAuthorizedException(BASIC.trim())); - - configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); - } - - Optional getBasicAuthentication() { - return getAuthorization(BASIC) - .map(Base64.getDecoder()::decode) - .map(String::new) - .filter(authn -> authn.indexOf(':') >= 0) - .map(authn -> new String[] { - authn.substring(0, authn.indexOf(':')), - authn.substring(authn.indexOf(':') + 1) - }) - .filter(userPass -> !userPass[0].isEmpty() && !userPass[1].isEmpty()); - } - - Optional getAuthorization(String scheme) { - return Optional.ofNullable(headers.getHeaderString(HttpHeaders.AUTHORIZATION)) - .filter(header -> header.regionMatches(true, 0, scheme, 0, scheme.length())) - .map(header -> header.substring(scheme.length())); + configs.put(SaslConfigs.SASL_JAAS_CONFIG, credential.value()); } private static final Pattern BOUNDARY_QUOTES = Pattern.compile("(^[\"'])|([\"']$)"); diff --git a/api/src/main/java/com/github/streamshub/console/api/ConsumerGroupsResource.java b/api/src/main/java/com/github/streamshub/console/api/ConsumerGroupsResource.java index 813c55d04..9b4fbf301 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ConsumerGroupsResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/ConsumerGroupsResource.java @@ -36,11 +36,14 @@ import com.github.streamshub.console.api.model.ConsumerGroup; import com.github.streamshub.console.api.model.ConsumerGroupFilterParams; import com.github.streamshub.console.api.model.ListFetchParams; +import com.github.streamshub.console.api.security.Authorized; +import com.github.streamshub.console.api.security.ResourcePrivilege; import com.github.streamshub.console.api.service.ConsumerGroupService; import com.github.streamshub.console.api.support.ErrorCategory; import com.github.streamshub.console.api.support.FieldFilter; import com.github.streamshub.console.api.support.ListRequestContext; import com.github.streamshub.console.api.support.StringEnumeration; +import com.github.streamshub.console.config.security.Privilege; import io.xlate.validation.constraints.Expression; @@ -67,6 +70,8 @@ public class ConsumerGroupsResource { @APIResponseSchema(ConsumerGroup.ListResponse.class) @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") + @Authorized + @ResourcePrivilege(action = Privilege.LIST) public CompletionStage listConsumerGroups( @Parameter(description = "Cluster identifier") @PathParam("clusterId") @@ -132,6 +137,8 @@ public CompletionStage listConsumerGroups( @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") + @Authorized + @ResourcePrivilege(action = Privilege.GET) public CompletionStage describeConsumerGroup( @Parameter(description = "Cluster identifier") @PathParam("clusterId") @@ -200,6 +207,8 @@ public CompletionStage describeConsumerGroup( node = { "data", "id" }, payload = ErrorCategory.InvalidResource.class, validationAppliesTo = ConstraintTarget.PARAMETERS) + @Authorized + @ResourcePrivilege(action = Privilege.UPDATE) public CompletionStage patchConsumerGroup( @Parameter(description = "Cluster identifier") @PathParam("clusterId") @@ -244,6 +253,8 @@ public CompletionStage patchConsumerGroup( @Path("{groupId}") @DELETE @APIResponseSchema(responseCode = "204", value = Void.class) + @Authorized + @ResourcePrivilege(action = Privilege.DELETE) public CompletionStage deleteConsumerGroup( @Parameter(description = "Cluster identifier") @PathParam("clusterId") diff --git a/api/src/main/java/com/github/streamshub/console/api/KafkaClustersResource.java b/api/src/main/java/com/github/streamshub/console/api/KafkaClustersResource.java index ee260d9f9..c49e96395 100644 --- a/api/src/main/java/com/github/streamshub/console/api/KafkaClustersResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/KafkaClustersResource.java @@ -32,11 +32,14 @@ import com.github.streamshub.console.api.model.KafkaCluster; import com.github.streamshub.console.api.model.ListFetchParams; +import com.github.streamshub.console.api.security.Authorized; +import com.github.streamshub.console.api.security.ResourcePrivilege; import com.github.streamshub.console.api.service.KafkaClusterService; import com.github.streamshub.console.api.support.ErrorCategory; import com.github.streamshub.console.api.support.FieldFilter; import com.github.streamshub.console.api.support.ListRequestContext; import com.github.streamshub.console.api.support.StringEnumeration; +import com.github.streamshub.console.config.security.Privilege; import io.xlate.validation.constraints.Expression; @@ -63,6 +66,8 @@ public class KafkaClustersResource { @APIResponseSchema(KafkaCluster.KafkaClusterDataList.class) @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") + @Authorized + @ResourcePrivilege(action = Privilege.LIST) public Response listClusters( @QueryParam(KafkaCluster.FIELDS_PARAM) @DefaultValue(KafkaCluster.Fields.LIST_DEFAULT) @@ -121,6 +126,8 @@ public Response listClusters( @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") + @Authorized + @ResourcePrivilege(action = Privilege.GET) public CompletionStage describeCluster( @Parameter(description = "Cluster identifier") @PathParam("clusterId") @@ -172,7 +179,7 @@ public CompletionStage describeCluster( requestedFields.accept(fields); - return clusterService.describeCluster(fields) + return clusterService.describeCluster(clusterId, fields) .thenApply(KafkaCluster.KafkaClusterData::new) .thenApply(Response::ok) .thenApply(Response.ResponseBuilder::build); diff --git a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java index ecd388ea5..9fe4d2a87 100644 --- a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java @@ -35,11 +35,14 @@ import com.github.streamshub.console.api.model.KafkaRecord; import com.github.streamshub.console.api.model.RecordFilterParams; +import com.github.streamshub.console.api.security.Authorized; +import com.github.streamshub.console.api.security.ResourcePrivilege; import com.github.streamshub.console.api.service.RecordService; import com.github.streamshub.console.api.support.ErrorCategory; import com.github.streamshub.console.api.support.FieldFilter; import com.github.streamshub.console.api.support.KafkaUuid; import com.github.streamshub.console.api.support.StringEnumeration; +import com.github.streamshub.console.config.security.Privilege; @Path("/api/kafkas/{clusterId}/topics/{topicId}/records") @Tag(name = "Kafka Cluster Resources") @@ -72,6 +75,8 @@ public class RecordsResource { @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") + @Authorized + @ResourcePrivilege(action = Privilege.LIST) public Response consumeRecords( @Parameter(description = "Cluster identifier") @PathParam("clusterId") @@ -140,6 +145,8 @@ public Response consumeRecords( @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") + @Authorized + @ResourcePrivilege(action = Privilege.CREATE) public CompletionStage produceRecord( @Parameter(description = "Cluster identifier") @PathParam("clusterId") diff --git a/api/src/main/java/com/github/streamshub/console/api/TopicsResource.java b/api/src/main/java/com/github/streamshub/console/api/TopicsResource.java index 17f3386e0..575979362 100644 --- a/api/src/main/java/com/github/streamshub/console/api/TopicsResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/TopicsResource.java @@ -42,6 +42,8 @@ import com.github.streamshub.console.api.model.Topic; import com.github.streamshub.console.api.model.TopicFilterParams; import com.github.streamshub.console.api.model.TopicPatch; +import com.github.streamshub.console.api.security.Authorized; +import com.github.streamshub.console.api.security.ResourcePrivilege; import com.github.streamshub.console.api.service.ConsumerGroupService; import com.github.streamshub.console.api.service.TopicService; import com.github.streamshub.console.api.support.ErrorCategory; @@ -50,6 +52,7 @@ import com.github.streamshub.console.api.support.KafkaUuid; import com.github.streamshub.console.api.support.ListRequestContext; import com.github.streamshub.console.api.support.StringEnumeration; +import com.github.streamshub.console.config.security.Privilege; import io.xlate.validation.constraints.Expression; @@ -83,6 +86,8 @@ public class TopicsResource { @APIResponse(responseCode = "201", description = "New topic successfully created", content = @Content(schema = @Schema(implementation = NewTopic.NewTopicDocument.class))) + @Authorized + @ResourcePrivilege(action = Privilege.CREATE) public CompletionStage createTopic( @Parameter(description = "Cluster identifier") @PathParam("clusterId") @@ -119,6 +124,8 @@ public CompletionStage createTopic( @Path("{topicId}") @DELETE @APIResponseSchema(responseCode = "204", value = Void.class) + @Authorized + @ResourcePrivilege(action = Privilege.DELETE) public CompletionStage deleteTopic( @Parameter(description = "Cluster identifier") @PathParam("clusterId") @@ -138,6 +145,8 @@ public CompletionStage deleteTopic( @APIResponseSchema(Topic.ListResponse.class) @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") + @Authorized + @ResourcePrivilege(action = Privilege.LIST) public CompletionStage listTopics( @Parameter(description = "Cluster identifier") @PathParam("clusterId") @@ -221,6 +230,8 @@ public CompletionStage listTopics( @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") + @Authorized + @ResourcePrivilege(action = Privilege.GET) public CompletionStage describeTopic( @Parameter(description = "Cluster identifier") @PathParam("clusterId") @@ -362,6 +373,8 @@ public CompletionStage listTopicConsumerGroups( node = { "data", "id" }, payload = ErrorCategory.InvalidResource.class, validationAppliesTo = ConstraintTarget.PARAMETERS) + @Authorized + @ResourcePrivilege(action = Privilege.UPDATE) public CompletionStage patchTopic( @Parameter(description = "Cluster identifier") @PathParam("clusterId") diff --git a/api/src/main/java/com/github/streamshub/console/api/errors/client/ForbiddenExceptionHandler.java b/api/src/main/java/com/github/streamshub/console/api/errors/client/ForbiddenExceptionHandler.java new file mode 100644 index 000000000..800249665 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/errors/client/ForbiddenExceptionHandler.java @@ -0,0 +1,34 @@ +package com.github.streamshub.console.api.errors.client; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.ws.rs.ForbiddenException; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.ext.Provider; + +import com.github.streamshub.console.api.model.ErrorResponse; +import com.github.streamshub.console.api.support.ErrorCategory; + +@Provider +@ApplicationScoped +public class ForbiddenExceptionHandler extends AbstractClientExceptionHandler { + + public ForbiddenExceptionHandler() { + super(ErrorCategory.NotAuthorized.class, "Insufficient permissions to resource or action", (String) null); + } + + @Override + public boolean handlesException(Throwable thrown) { + return thrown instanceof ForbiddenException; + } + + @Override + public Response toResponse(ForbiddenException exception) { + var responseBuilder = Response.status(category.getHttpStatus()) + .entity(new ErrorResponse(buildErrors(exception))); + + exception.getResponse().getHeaders().forEach((k, v) -> + responseBuilder.header(k, exception.getResponse().getHeaderString(k))); + + return responseBuilder.build(); + } +} \ No newline at end of file diff --git a/api/src/main/java/com/github/streamshub/console/api/security/AuthorizationInterceptor.java b/api/src/main/java/com/github/streamshub/console/api/security/AuthorizationInterceptor.java new file mode 100644 index 000000000..af6b67d1a --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/security/AuthorizationInterceptor.java @@ -0,0 +1,47 @@ +package com.github.streamshub.console.api.security; + +import jakarta.annotation.Priority; +import jakarta.enterprise.context.Dependent; +import jakarta.inject.Inject; +import jakarta.interceptor.AroundInvoke; +import jakarta.interceptor.Interceptor; +import jakarta.interceptor.InvocationContext; +import jakarta.ws.rs.ForbiddenException; +import jakarta.ws.rs.core.UriInfo; + +import org.jboss.logging.Logger; + +import io.quarkus.security.identity.SecurityIdentity; + +@Authorized +@Priority(1) +@Interceptor +@Dependent +public class AuthorizationInterceptor { + + @Inject + Logger logger; + + @Inject + SecurityIdentity securityIdentity; + + @Inject + UriInfo requestUri; + + @AroundInvoke + Object authorize(InvocationContext context) throws Exception { + ResourcePrivilege authz = context.getMethod().getAnnotation(ResourcePrivilege.class); + String resourcePath = requestUri.getPath().substring("/api/".length()); + var requiredPermission = new ConsolePermission(resourcePath, authz.action()); + boolean allow = securityIdentity.checkPermission(requiredPermission) + .subscribeAsCompletionStage() + .get(); + + if (!allow) { + throw new ForbiddenException("Access denied"); + } + + return context.proceed(); + } + +} diff --git a/api/src/main/java/com/github/streamshub/console/api/security/Authorized.java b/api/src/main/java/com/github/streamshub/console/api/security/Authorized.java new file mode 100644 index 000000000..1be22372d --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/security/Authorized.java @@ -0,0 +1,14 @@ +package com.github.streamshub.console.api.security; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import jakarta.interceptor.InterceptorBinding; + +@InterceptorBinding +@Retention(RetentionPolicy.RUNTIME) +@Target({ ElementType.TYPE, ElementType.METHOD }) +public @interface Authorized { +} diff --git a/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java b/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java new file mode 100644 index 000000000..c28355b79 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java @@ -0,0 +1,403 @@ +package com.github.streamshub.console.api.security; + +import java.io.IOException; +import java.security.Permission; +import java.security.Principal; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Alternative; +import jakarta.inject.Inject; +import jakarta.ws.rs.core.HttpHeaders; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.jboss.logging.Logger; +import org.jose4j.jwt.JwtClaims; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.streamshub.console.api.ClientFactory; +import com.github.streamshub.console.api.model.Error; +import com.github.streamshub.console.api.model.ErrorResponse; +import com.github.streamshub.console.api.support.ErrorCategory; +import com.github.streamshub.console.api.support.KafkaContext; +import com.github.streamshub.console.config.ConsoleConfig; +import com.github.streamshub.console.config.security.Privilege; +import com.github.streamshub.console.config.security.SecurityConfig; +import com.github.streamshub.console.config.security.SubjectConfig; + +import io.quarkus.oidc.runtime.OidcAuthenticationMechanism; +import io.quarkus.oidc.runtime.OidcJwtCallerPrincipal; +import io.quarkus.security.AuthenticationFailedException; +import io.quarkus.security.credential.Credential; +import io.quarkus.security.identity.IdentityProviderManager; +import io.quarkus.security.identity.SecurityIdentity; +import io.quarkus.security.identity.request.AnonymousAuthenticationRequest; +import io.quarkus.security.identity.request.AuthenticationRequest; +import io.quarkus.security.identity.request.TokenAuthenticationRequest; +import io.quarkus.security.identity.request.UsernamePasswordAuthenticationRequest; +import io.quarkus.security.runtime.QuarkusPrincipal; +import io.quarkus.security.runtime.QuarkusSecurityIdentity; +import io.quarkus.vertx.http.runtime.security.ChallengeData; +import io.quarkus.vertx.http.runtime.security.HttpAuthenticationMechanism; +import io.smallrye.mutiny.Uni; +import io.vertx.core.MultiMap; +import io.vertx.ext.web.RoutingContext; + +@Alternative +@Priority(1) +@ApplicationScoped +public class ConsoleAuthenticationMechanism implements HttpAuthenticationMechanism { + + public static final String OAUTHBEARER = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; + public static final String PLAIN = "PLAIN"; + public static final String SCRAM_SHA256 = "SCRAM-SHA-256"; + public static final String SCRAM_SHA512 = "SCRAM-SHA-512"; + + private static final String BEARER = "Bearer "; + private static final String BASIC = "Basic "; + + private static class Audit extends java.util.logging.Level { + private static final long serialVersionUID = 1L; + + Audit() { + super("AUDIT", java.util.logging.Level.INFO.intValue() - 1); + } + } + + private static final java.util.logging.Level AUDIT = new Audit(); + + private static final SecurityIdentity ANONYMOUS = QuarkusSecurityIdentity.builder() + .setAnonymous(true) + .setPrincipal(new QuarkusPrincipal("ANONYMOUS")) + .build(); + + @Inject + Logger log; + + @Inject + ObjectMapper mapper; + + @Inject + ConsoleConfig consoleConfig; + + @Inject + Map contexts; + + @Inject + OidcAuthenticationMechanism oidc; + + boolean oidcEnabled() { + return Objects.nonNull(consoleConfig.getSecurity().getOidc()); + } + + @Override + public Uni authenticate(RoutingContext context, IdentityProviderManager identityProviderManager) { + if (oidcEnabled()) { + return oidc.authenticate(context, identityProviderManager) + .map(identity -> { + if (identity != null) { + String clusterId = getClusterId(context); + var ctx = clusterId != null ? contexts.get(clusterId) : null; + return createIdentity(ctx, identity); + } + throw new AuthenticationFailedException(); + }); + } + + String clusterId = getClusterId(context); + + if (clusterId == null) { + return Uni.createFrom().item(createAnonymousIdentity(null)); + } + + var ctx = contexts.get(clusterId); + + if (ctx == null) { + return Uni.createFrom().item(createAnonymousIdentity(null)); + } + + String saslMechanism = ctx.saslMechanism(Admin.class); + + if (ctx.admin() != null || saslMechanism.isEmpty()) { + return Uni.createFrom().item(createAnonymousIdentity(ctx)); + } + + var identity = createIdentity(ctx, context.request().headers(), saslMechanism); + + if (identity != null) { + return Uni.createFrom().item(identity); + } + + return Uni.createFrom().failure(new AuthenticationFailedException()); + } + + @Override + public Uni sendChallenge(RoutingContext context) { + return getChallenge(context).map(challengeData -> { + if (challengeData == null) { + return false; + } + + var response = context.response(); + response.setStatusCode(challengeData.status); + + if (challengeData.headerName != null) { + response.headers().set(challengeData.headerName, challengeData.headerContent); + } + + try { + response.send(mapper.writeValueAsString(((PayloadChallengeData) challengeData).payload)); + } catch (IOException e) { + log.warnf(e, "Failed to serialize challenge response body: %s", e.getMessage()); + } + + return true; + }); + } + + @Override + public Uni getChallenge(RoutingContext context) { + if (oidcEnabled()) { + return oidc.getChallenge(context) + .map(data -> { + var category = ErrorCategory.get(ErrorCategory.NotAuthenticated.class); + Error error = category.createError("Authentication credentials missing or invalid", null, null); + var responseBody = new ErrorResponse(List.of(error)); + return new PayloadChallengeData(data, responseBody); + }); + } + + String clusterId = getClusterId(context); + + if (clusterId == null) { + return Uni.createFrom().nullItem(); + } + + var ctx = contexts.get(clusterId); + + if (ctx == null) { + return Uni.createFrom().nullItem(); + } + + String saslMechanism = ctx.saslMechanism(Admin.class); + String scheme = getAuthorizationScheme(saslMechanism); + ChallengeData challenge; + + if (scheme != null) { + var category = ErrorCategory.get(ErrorCategory.NotAuthenticated.class); + Error error = category.createError("Authentication credentials missing or invalid", null, null); + var responseBody = new ErrorResponse(List.of(error)); + challenge = new PayloadChallengeData(401, "WWW-Authenticate", scheme, responseBody); + } else { + log.warnf("Access not permitted to cluster %s with unknown SASL mechanism '%s'", + clusterId, saslMechanism); + var category = ErrorCategory.get(ErrorCategory.ResourceNotFound.class); + Error error = category.createError(ClientFactory.NO_SUCH_KAFKA_MESSAGE.formatted(clusterId), null, null); + var responseBody = new ErrorResponse(List.of(error)); + challenge = new PayloadChallengeData(404, null, null, responseBody); + } + + return Uni.createFrom().item(challenge); + } + + @Override + public Set> getCredentialTypes() { + if (oidcEnabled()) { + return oidc.getCredentialTypes(); + } + + return Set.of( + AnonymousAuthenticationRequest.class, + TokenAuthenticationRequest.class, + UsernamePasswordAuthenticationRequest.class + ); + } + + String getClusterId(RoutingContext context) { + Pattern p = Pattern.compile("/api/kafkas/([^/]+)(?:/.*)?"); + Matcher m = p.matcher(context.normalizedPath()); + if (m.matches()) { + return m.group(1); + } + return null; + } + + String getAuthorizationScheme(String saslMechanism) { + switch (saslMechanism) { + case OAUTHBEARER: + return BEARER.trim(); + case PLAIN, SCRAM_SHA256, SCRAM_SHA512: + return BASIC.trim(); + default: + return null; + } + } + + SecurityIdentity createAnonymousIdentity(KafkaContext ctx) { + return createIdentity(ctx, ANONYMOUS); + } + + SecurityIdentity createIdentity(KafkaContext ctx, SecurityIdentity source) { + var builder = QuarkusSecurityIdentity.builder(source); + addRoleChecker(ctx, builder, source.getPrincipal()); + return builder.build(); + } + + SecurityIdentity createIdentity(KafkaContext ctx, MultiMap headers, String saslMechanism) { + switch (saslMechanism) { + case OAUTHBEARER: + return createOAuthIdentity(ctx, headers); + case PLAIN: + return createBasicIdentity(ctx, headers, SaslJaasConfigCredential::forPlainLogin); + case SCRAM_SHA256, SCRAM_SHA512: + return createBasicIdentity(ctx, headers, SaslJaasConfigCredential::forScramLogin); + default: + return null; + } + } + + SecurityIdentity createOAuthIdentity(KafkaContext ctx, MultiMap headers) { + return getAuthorization(headers, BEARER) + .map(accessToken -> { + var builder = QuarkusSecurityIdentity.builder(); + builder.addCredential(SaslJaasConfigCredential.forOAuthLogin(accessToken)); + Principal principal; + + try { + var claims = JwtClaims.parse(accessToken); + principal = new OidcJwtCallerPrincipal(claims, null); + } catch (Exception e) { + log.infof("JWT access token could not be parsed: %s", e.getMessage()); + principal = new QuarkusPrincipal("UNKNOWN"); + } + + builder.setPrincipal(principal); + addRoleChecker(ctx, builder, principal); + return builder.build(); + }) + .orElse(null); + } + + SecurityIdentity createBasicIdentity(KafkaContext ctx, MultiMap headers, BiFunction credentialBuilder) { + return getBasicAuthentication(headers) + .map(userpass -> { + var builder = QuarkusSecurityIdentity.builder(); + var principal = new QuarkusPrincipal(userpass[0]); + builder.addCredential(credentialBuilder.apply(userpass[0], userpass[1])); + builder.setPrincipal(principal); + addRoleChecker(ctx, builder, principal); + return builder.build(); + }) + .orElse(null); + } + + void addRoleChecker(KafkaContext ctx, QuarkusSecurityIdentity.Builder builder, Principal principal) { + Stream globalSubjects = consoleConfig.getSecurity().getSubjects().stream(); + Stream clusterSubjects = Stream.empty(); + + if (ctx != null) { + clusterSubjects = ctx.clusterConfig().getSecurity().getSubjects().stream(); + } + + List roleNames = Stream.concat(clusterSubjects, globalSubjects) + .filter(sub -> Objects.isNull(sub.getIssuer()) /* or issuer matches `iss` claim */) + .filter(sub -> Objects.isNull(sub.getClaim()) /* only without OIDC */) + .filter(sub -> sub.getInclude().contains(principal.getName())) + .flatMap(sub -> sub.getRoleNames().stream()) + .distinct() + .toList(); + + Stream globalPermissions = getPermissions(consoleConfig.getSecurity(), roleNames, ""); + Stream clusterPermissions = Stream.empty(); + + if (ctx != null) { + clusterPermissions = getPermissions( + ctx.clusterConfig().getSecurity(), + roleNames, + "kafkas/" + ctx.clusterId() + '/' + ); + } + + List possessedPermissions = Stream.concat(globalPermissions, clusterPermissions).toList(); + + builder.addPermissionChecker(requiredPermission -> { + boolean allowed = possessedPermissions + .stream() + .anyMatch(possessed -> possessed.implies(requiredPermission)); + + String category = getClass().getPackageName() + (allowed ? ".ALLOW" : ".DENY"); + + java.util.logging.Logger.getLogger(category).log(AUDIT, () -> { + return String.format("Principal %s %s access to %s", principal.getName(), allowed ? "allowed" : "denied", requiredPermission); + }); + + return Uni.createFrom().item(allowed); + }); + } + + Stream getPermissions(SecurityConfig security, Collection roleNames, String resourcePrefix) { + return security.getRoles() + .stream() + .filter(role -> roleNames.contains(role.getName())) + .flatMap(role -> role.getRules().stream()) + .flatMap(rule -> { + List rulePermissions = new ArrayList<>(); + Privilege[] actions = rule.getPrivileges().toArray(Privilege[]::new); + + for (var resource : rule.getResources()) { + rulePermissions.add(new ConsolePermission( + resourcePrefix + resource, + rule.getResourceNames(), + actions + )); + } + + return rulePermissions.stream(); + }); + } + + Optional getBasicAuthentication(MultiMap headers) { + return getAuthorization(headers, BASIC) + .map(Base64.getDecoder()::decode) + .map(String::new) + .filter(authn -> authn.indexOf(':') >= 0) + .map(authn -> new String[] { + authn.substring(0, authn.indexOf(':')), + authn.substring(authn.indexOf(':') + 1) + }) + .filter(userPass -> !userPass[0].isEmpty() && !userPass[1].isEmpty()); + } + + Optional getAuthorization(MultiMap headers, String scheme) { + return Optional.ofNullable(headers.get(HttpHeaders.AUTHORIZATION)) + .filter(header -> header.regionMatches(true, 0, scheme, 0, scheme.length())) + .map(header -> header.substring(scheme.length())); + } + + static class PayloadChallengeData extends ChallengeData { + public final Object payload; + + public PayloadChallengeData(int status, CharSequence headerName, String headerContent, Object payload) { + super(status, headerName, headerContent); + this.payload = payload; + } + + public PayloadChallengeData(ChallengeData data, Object payload) { + super(data.status, data.headerName, data.headerContent); + this.payload = payload; + } + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java b/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java new file mode 100644 index 000000000..b74f54e59 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java @@ -0,0 +1,164 @@ +package com.github.streamshub.console.api.security; + +import java.security.Permission; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import com.github.streamshub.console.config.security.Privilege; + +public class ConsolePermission extends Permission { + + private static final long serialVersionUID = 1L; + public static final String ACTIONS_SEPARATOR = ","; + + private String resource; + private Collection resourceNames; + private final Set actions; + + public ConsolePermission(String resource, Privilege... actions) { + super("console"); + this.resource = resource; + this.resourceNames = Collections.emptySet(); + this.actions = checkActions(actions); + } + + public ConsolePermission(String resource, Collection resourceNames, Privilege... actions) { + super("console"); + this.resource = resource; + this.resourceNames = resourceNames; + this.actions = checkActions(actions); + } + + private static Set checkActions(Privilege[] actions) { + Set validActions = new HashSet<>(actions.length, 1); + for (Privilege action : actions) { + validActions.add(validateAndTrim(action, "Action")); + } + return Collections.unmodifiableSet(validActions); + } + + private static Privilege validateAndTrim(Privilege action, String paramName) { + if (action == null) { + throw new IllegalArgumentException(String.format("%s must not be null", paramName)); + } + + return action; + } + + public String resource() { + return resource; + } + + public ConsolePermission resource(String resource) { + this.resource = resource; + return this; + } + + public ConsolePermission resourceName(String resourceName) { + this.resourceNames = Collections.singleton(resourceName); + return this; + } + + @Override + public boolean implies(Permission requiredPermission) { + if (requiredPermission instanceof ConsolePermission other) { + if (!getName().equals(other.getName())) { + return false; + } + + return implies(other); + } else { + return false; + } + } + + boolean implies(ConsolePermission requiredPermission) { + if (!requiredPermission.resource.startsWith(resource)) { + return false; + } + + if (requiredPermission.resource.equals(resource)) { + if (!requiredPermission.resourceNames.isEmpty() + && !resourceNames.isEmpty() + && requiredPermission.resourceNames.stream().noneMatch(resourceNames::contains)) { + return false; + } + } else if (requiredPermission.resourceNames.isEmpty() && !resourceNames.isEmpty()) { + boolean matches = false; + for (String name : resourceNames) { + String fullName = resource + '/' + name; + if (fullName.equals(requiredPermission.resource)) { + matches = true; + } + } + if (!matches) { + return false; + } + } else { + return false; + } + + // actions are optional, however if at least one action was specified, + // an intersection of compared sets must not be empty + if (requiredPermission.actions.isEmpty()) { + // no required actions + return true; + } + + if (actions.isEmpty()) { + // no possessed actions + return false; + } + + if (actions.contains(Privilege.ALL)) { + // all actions possessed + return true; + } + + for (Privilege action : requiredPermission.actions) { + if (actions.contains(action)) { + // has at least one of required actions + return true; + } + } + + return false; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (!(obj instanceof ConsolePermission other)) { + return false; + } + + return getName().equals(other.getName()) + && resource.equals(other.resource) + && actions.equals(other.actions); + } + + @Override + public int hashCode() { + return Objects.hash(getName(), resource, actions); + } + + @Override + public String toString() { + return getName() + ":" + resource() + ":" + resourceNames + ":" + actions; + } + + /** + * @return null if no actions were specified, or actions joined together with the {@link #ACTIONS_SEPARATOR} + */ + @Override + public String getActions() { + return actions.isEmpty() ? null : actions.stream().map(Enum::name).collect(Collectors.joining(ACTIONS_SEPARATOR)); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/security/OidcTenantConfigResolver.java b/api/src/main/java/com/github/streamshub/console/api/security/OidcTenantConfigResolver.java new file mode 100644 index 000000000..d96b5420f --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/security/OidcTenantConfigResolver.java @@ -0,0 +1,44 @@ +package com.github.streamshub.console.api.security; + +import java.time.Duration; +import java.util.List; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import com.github.streamshub.console.config.ConsoleConfig; + +import io.quarkus.oidc.OidcRequestContext; +import io.quarkus.oidc.OidcTenantConfig; +import io.quarkus.oidc.TenantConfigResolver; +import io.smallrye.mutiny.Uni; +import io.vertx.ext.web.RoutingContext; + +@ApplicationScoped +public class OidcTenantConfigResolver implements TenantConfigResolver { + + @Inject + ConsoleConfig consoleConfig; + + OidcTenantConfig oidcConfig; + + @PostConstruct + void initialize() { + oidcConfig = new OidcTenantConfig(); + var oidc = consoleConfig.getSecurity().getOidc(); + + oidcConfig.setTenantId(oidc.getTenantId()); + oidcConfig.setDiscoveryEnabled(true); + oidcConfig.setAuthServerUrl(oidc.getAuthServerUrl()); + oidcConfig.setRoles(OidcTenantConfig.Roles.fromClaimPath(List.of("groups"))); + oidcConfig.getToken().setForcedJwkRefreshInterval(Duration.ofSeconds(5)); + } + + @Override + public Uni resolve(RoutingContext routingContext, + OidcRequestContext requestContext) { + return Uni.createFrom().item(oidcConfig); + } + +} diff --git a/api/src/main/java/com/github/streamshub/console/api/security/ResourcePrivilege.java b/api/src/main/java/com/github/streamshub/console/api/security/ResourcePrivilege.java new file mode 100644 index 000000000..dd5babaf8 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/security/ResourcePrivilege.java @@ -0,0 +1,16 @@ +package com.github.streamshub.console.api.security; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import com.github.streamshub.console.config.security.Privilege; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface ResourcePrivilege { + + Privilege action() default Privilege.ALL; + +} diff --git a/api/src/main/java/com/github/streamshub/console/api/security/SaslJaasConfigCredential.java b/api/src/main/java/com/github/streamshub/console/api/security/SaslJaasConfigCredential.java new file mode 100644 index 000000000..5b2916ec0 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/security/SaslJaasConfigCredential.java @@ -0,0 +1,41 @@ +package com.github.streamshub.console.api.security; + +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.security.scram.ScramLoginModule; + +import io.quarkus.security.credential.Credential; + +public class SaslJaasConfigCredential implements Credential { + + private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName() + + " required" + + " oauth.access.token=\"%s\" ;"; + + + private static final String BASIC_TEMPLATE = "%s required username=\"%%s\" password=\"%%s\" ;"; + private static final String SASL_PLAIN_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(PlainLoginModule.class.getName()); + private static final String SASL_SCRAM_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(ScramLoginModule.class.getName()); + + public static SaslJaasConfigCredential forOAuthLogin(String accessToken) { + return new SaslJaasConfigCredential(SASL_OAUTH_CONFIG_TEMPLATE.formatted(accessToken)); + } + + public static SaslJaasConfigCredential forPlainLogin(String username, String password) { + return new SaslJaasConfigCredential(SASL_PLAIN_CONFIG_TEMPLATE.formatted(username, password)); + } + + public static SaslJaasConfigCredential forScramLogin(String username, String password) { + return new SaslJaasConfigCredential(SASL_SCRAM_CONFIG_TEMPLATE.formatted(username, password)); + } + + private final String value; + + private SaslJaasConfigCredential(String value) { + this.value = value; + } + + public String value() { + return value; + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/service/BrokerService.java b/api/src/main/java/com/github/streamshub/console/api/service/BrokerService.java index d546742f7..9ec3630ea 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/BrokerService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/BrokerService.java @@ -26,8 +26,8 @@ public class BrokerService { @Inject ThreadContext threadContext; - public CompletionStage> describeConfigs(String nodeId) { - return clusterService.describeCluster(Collections.emptyList()) + public CompletionStage> describeConfigs(String clusterId, String nodeId) { + return clusterService.describeCluster(clusterId, Collections.emptyList()) .thenApply(cluster -> { if (cluster.nodes().stream().mapToInt(Node::id).mapToObj(String::valueOf).noneMatch(nodeId::equals)) { throw new NotFoundException("No such node: " + nodeId); diff --git a/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java b/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java index 075c81e10..62a172b00 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java @@ -154,7 +154,9 @@ CompletionStage> listConsumerGroups(List groupIds, L .dropWhile(listSupport::beforePageBegin) .takeWhile(listSupport::pageCapacityAvailable) .toList()) - .thenCompose(groups -> augmentList(adminClient, groups, includes)); + .thenComposeAsync( + groups -> augmentList(adminClient, groups, includes), + threadContext.currentContextExecutor()); } public CompletionStage describeConsumerGroup(String requestGroupId, List includes) { @@ -162,7 +164,9 @@ public CompletionStage describeConsumerGroup(String requestGroupI String groupId = preprocessGroupId(requestGroupId); return assertConsumerGroupExists(adminClient, groupId) - .thenCompose(nothing -> describeConsumerGroups(adminClient, List.of(groupId), includes)) + .thenComposeAsync( + nothing -> describeConsumerGroups(adminClient, List.of(groupId), includes), + threadContext.currentContextExecutor()) .thenApply(groups -> groups.get(groupId)) .thenApply(result -> result.getOrThrow(CompletionException::new)); } @@ -174,13 +178,15 @@ public CompletionStage>> listConsumerGroupMembership(Co .inStates(Set.of( ConsumerGroupState.STABLE, ConsumerGroupState.PREPARING_REBALANCE, - ConsumerGroupState.COMPLETING_REBALANCE))) + ConsumerGroupState.COMPLETING_REBALANCE, + ConsumerGroupState.EMPTY))) .valid() .toCompletionStage() .thenApply(groups -> groups.stream().map(ConsumerGroup::fromKafkaModel).toList()) - .thenCompose(groups -> augmentList(adminClient, groups, List.of( + .thenComposeAsync(groups -> augmentList(adminClient, groups, List.of( ConsumerGroup.Fields.MEMBERS, - ConsumerGroup.Fields.OFFSETS))) + ConsumerGroup.Fields.OFFSETS)), + threadContext.currentContextExecutor()) .thenApply(list -> list.stream() .map(group -> Map.entry( group.getGroupId(), @@ -341,7 +347,7 @@ CompletionStage> alterConsumerGroupOffsets(Admin adminCl CompletionStage alterConsumerGroupOffsetsDryRun(Admin adminClient, String groupId, Map alterRequest) { - var pendingTopicsIds = fetchTopicIdMap(adminClient); + var pendingTopicsIds = fetchTopicIdMap(); return describeConsumerGroups(adminClient, List.of(groupId), Collections.emptyList()) .thenApply(groups -> groups.get(groupId)) @@ -471,7 +477,7 @@ CompletionStage>> describeConsumerG Map> result = new LinkedHashMap<>(groupIds.size()); - var pendingTopicsIds = fetchTopicIdMap(adminClient); + var pendingTopicsIds = fetchTopicIdMap(); var pendingDescribes = adminClient.describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions() @@ -513,8 +519,8 @@ CompletionStage>> describeConsumerG }); } - CompletableFuture> fetchTopicIdMap(Admin adminClient) { - return topicService.listTopics(adminClient, true) + CompletableFuture> fetchTopicIdMap() { + return topicService.listTopics(true) .thenApply(topics -> topics.stream() .collect(Collectors.toMap(TopicListing::name, l -> l.topicId().toString()))); } diff --git a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java index 316a14d90..08602bcc6 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java @@ -30,15 +30,18 @@ import com.github.streamshub.console.api.model.KafkaCluster; import com.github.streamshub.console.api.model.KafkaListener; import com.github.streamshub.console.api.model.Node; +import com.github.streamshub.console.api.security.ConsolePermission; import com.github.streamshub.console.api.support.Holder; import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.ListRequestContext; import com.github.streamshub.console.config.ConsoleConfig; +import com.github.streamshub.console.config.security.Privilege; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.quarkus.security.identity.SecurityIdentity; import io.strimzi.api.ResourceAnnotations; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaBuilder; @@ -89,6 +92,9 @@ public class KafkaClusterService { */ KafkaContext kafkaContext; + @Inject + SecurityIdentity securityIdentity; + boolean listUnconfigured = false; Predicate includeAll = k -> listUnconfigured; @@ -119,7 +125,13 @@ public List listClusters(ListRequestContext listSupp .filter(k -> !configuredClusters.containsKey(k.getId())) .toList(); + ConsolePermission required = new ConsolePermission("kafkas", Privilege.LIST); + return Stream.concat(configuredClusters.values().stream(), otherClusters.stream()) + .filter(cluster -> { + required.resourceName(cluster.getId()); + return securityIdentity.checkPermissionBlocking(required); + }) .map(listSupport::tally) .filter(listSupport::betweenCursors) .sorted(listSupport.getSortComparator()) @@ -129,7 +141,7 @@ public List listClusters(ListRequestContext listSupp .toList(); } - public CompletionStage describeCluster(List fields) { + public CompletionStage describeCluster(String clusterId, List fields) { Admin adminClient = kafkaContext.admin(); DescribeClusterOptions options = new DescribeClusterOptions() .includeAuthorizedOperations(fields.contains(KafkaCluster.Fields.AUTHORIZED_OPERATIONS)); diff --git a/api/src/main/java/com/github/streamshub/console/api/service/TopicService.java b/api/src/main/java/com/github/streamshub/console/api/service/TopicService.java index 6d04de918..e451a7975 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/TopicService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/TopicService.java @@ -54,15 +54,18 @@ import com.github.streamshub.console.api.model.ReplicaLocalStorage; import com.github.streamshub.console.api.model.Topic; import com.github.streamshub.console.api.model.TopicPatch; +import com.github.streamshub.console.api.security.ConsolePermission; import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.KafkaOffsetSpec; import com.github.streamshub.console.api.support.ListRequestContext; import com.github.streamshub.console.api.support.TopicValidation; import com.github.streamshub.console.api.support.UnknownTopicIdPatch; import com.github.streamshub.console.api.support.ValidationProxy; +import com.github.streamshub.console.config.security.Privilege; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.client.KubernetesClient; +import io.quarkus.security.identity.SecurityIdentity; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.topic.KafkaTopic; @@ -98,6 +101,9 @@ public class TopicService { @Inject ThreadContext threadContext; + @Inject + SecurityIdentity securityIdentity; + @Inject ValidationProxy validationService; @@ -170,7 +176,7 @@ public CompletionStage> listTopics(List fields, String offse "statuses", statuses, "totalPartitions", partitionCount)); - return listTopics(adminClient, true) + return listTopics(true) .thenApply(list -> list.stream().map(Topic::fromTopicListing).toList()) .thenComposeAsync( list -> augmentList(adminClient, list, fetchList, offsetSpec), @@ -200,12 +206,19 @@ Topic tallySummary(Map statuses, AtomicInteger partitionCount, return topic; } - CompletableFuture> listTopics(Admin adminClient, boolean listInternal) { + CompletableFuture> listTopics(boolean listInternal) { + Admin adminClient = kafkaContext.admin(); + ConsolePermission required = new ConsolePermission( + "kafkas/%s/topics".formatted(kafkaContext.clusterId()), + Privilege.LIST); + return adminClient .listTopics(new ListTopicsOptions().listInternal(listInternal)) .listings() - .thenApply(topics -> topics.stream().toList()) .toCompletionStage() + .thenApplyAsync(topics -> topics.stream() + .filter(t -> securityIdentity.checkPermissionBlocking(required.resourceName(t.name()))) + .toList(), threadContext.currentContextExecutor()) .toCompletableFuture(); } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java index 2c505f1f4..f1cae529a 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -16,6 +16,7 @@ import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec; import io.strimzi.api.kafka.model.kafka.KafkaSpec; +import io.strimzi.api.kafka.model.kafka.KafkaStatus; import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuth; import io.strimzi.kafka.oauth.client.ClientConfig; @@ -43,6 +44,12 @@ public KafkaContext(KafkaContext other, Admin admin) { this.applicationScoped = false; } + public static String clusterId(KafkaClusterConfig clusterConfig, Optional kafkaResource) { + return Optional.ofNullable(clusterConfig.getId()) + .or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId)) + .orElseGet(clusterConfig::getName); + } + @Override public boolean equals(Object obj) { if (!(obj instanceof KafkaContext)) { @@ -68,6 +75,10 @@ public void close() { } } + public String clusterId() { + return clusterId(clusterConfig, Optional.ofNullable(resource)); + } + public KafkaClusterConfig clusterConfig() { return clusterConfig; } diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index 0378be55a..f90595fec 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -1,7 +1,7 @@ quarkus.http.access-log.enabled=true quarkus.http.record-request-start-time=true # Default access-log pattern with `%u` removed. Due to the mixing of Quarkus and Vert.x authorization, the user authenticated cannot be obtained at this time -quarkus.http.access-log.pattern=%{REMOTE_HOST} %l "%{REQUEST_LINE}" %{RESPONSE_CODE} %{RESPONSE_TIME}ms %{BYTES_SENT} +quarkus.http.access-log.pattern=%{REMOTE_USER} %{REMOTE_HOST} %l "%{REQUEST_LINE}" %{RESPONSE_CODE} %{RESPONSE_TIME}ms %{BYTES_SENT} quarkus.http.access-log.exclude-pattern=(?:/health(/live|/ready|/started)?|/metrics) quarkus.http.non-application-root-path=${quarkus.http.root-path} quarkus.http.http2=false @@ -16,8 +16,8 @@ quarkus.http.cors.access-control-allow-credentials=true quarkus.http.header."Strict-Transport-Security".value=max-age=31536000 quarkus.http.auth.basic=false -#quarkus.http.auth.permission."oidc".policy=authenticated -#quarkus.http.auth.permission."oidc".paths=/api/* +quarkus.http.auth.permission."oidc".policy=permit +quarkus.http.auth.permission."oidc".paths=/api/* # See https://quarkus.io/guides/kafka-dev-services # Enable when using quarkus-kafka-client @@ -66,6 +66,7 @@ console.kafka.admin.default.api.timeout.ms=10000 %dev.quarkus.kubernetes-client.trust-certs=true %dev.quarkus.log.category."io.vertx.core.impl.BlockedThreadChecker".level=OFF %dev.quarkus.log.category."com.github.streamshub.console".level=DEBUG +%dev.quarkus.log.category."io.quarkus.oidc.runtime".level=DEBUG ######## %testplain.quarkus.devservices.enabled=true diff --git a/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java b/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java index 691eabd41..516474cd3 100644 --- a/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java @@ -1,8 +1,11 @@ package com.github.streamshub.console.config; +import com.github.streamshub.console.config.security.GlobalSecurityConfig; + public class ConsoleConfig { KubernetesConfig kubernetes = new KubernetesConfig(); + GlobalSecurityConfig security = new GlobalSecurityConfig(); KafkaConfig kafka = new KafkaConfig(); public KubernetesConfig getKubernetes() { @@ -13,6 +16,14 @@ public void setKubernetes(KubernetesConfig kubernetes) { this.kubernetes = kubernetes; } + public GlobalSecurityConfig getSecurity() { + return security; + } + + public void setSecurity(GlobalSecurityConfig security) { + this.security = security; + } + public KafkaConfig getKafka() { return kafka; } diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java index b5a8334a9..acc18a872 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java @@ -6,6 +6,7 @@ import jakarta.validation.constraints.NotBlank; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.github.streamshub.console.config.security.SecurityConfig; public class KafkaClusterConfig { @@ -14,6 +15,7 @@ public class KafkaClusterConfig { private String name; private String namespace; private String listener; + private SecurityConfig security = new SecurityConfig(); private Map properties = new LinkedHashMap<>(); private Map adminProperties = new LinkedHashMap<>(); private Map consumerProperties = new LinkedHashMap<>(); @@ -53,6 +55,14 @@ public void setNamespace(String namespace) { this.namespace = namespace; } + public SecurityConfig getSecurity() { + return security; + } + + public void setSecurity(SecurityConfig security) { + this.security = security; + } + public String getListener() { return listener; } diff --git a/common/src/main/java/com/github/streamshub/console/config/security/GlobalSecurityConfig.java b/common/src/main/java/com/github/streamshub/console/config/security/GlobalSecurityConfig.java new file mode 100644 index 000000000..32f41fcb8 --- /dev/null +++ b/common/src/main/java/com/github/streamshub/console/config/security/GlobalSecurityConfig.java @@ -0,0 +1,14 @@ +package com.github.streamshub.console.config.security; + +public class GlobalSecurityConfig extends SecurityConfig { + + private OidcConfig oidc; + + public OidcConfig getOidc() { + return oidc; + } + + public void setOidc(OidcConfig oidc) { + this.oidc = oidc; + } +} diff --git a/common/src/main/java/com/github/streamshub/console/config/security/OidcConfig.java b/common/src/main/java/com/github/streamshub/console/config/security/OidcConfig.java new file mode 100644 index 000000000..798fcee05 --- /dev/null +++ b/common/src/main/java/com/github/streamshub/console/config/security/OidcConfig.java @@ -0,0 +1,42 @@ +package com.github.streamshub.console.config.security; + +public class OidcConfig { + + private String tenantId = "streamshub-console"; + private String authServerUrl; + private String clientId; + private String clientSecret; + + public String getTenantId() { + return tenantId; + } + + public void setTenantId(String tenantId) { + this.tenantId = tenantId; + } + + public String getAuthServerUrl() { + return authServerUrl; + } + + public void setAuthServerUrl(String authServerUrl) { + this.authServerUrl = authServerUrl; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getClientSecret() { + return clientSecret; + } + + public void setClientSecret(String clientSecret) { + this.clientSecret = clientSecret; + } + +} diff --git a/common/src/main/java/com/github/streamshub/console/config/security/Privilege.java b/common/src/main/java/com/github/streamshub/console/config/security/Privilege.java new file mode 100644 index 000000000..7ba1aebb2 --- /dev/null +++ b/common/src/main/java/com/github/streamshub/console/config/security/Privilege.java @@ -0,0 +1,24 @@ +package com.github.streamshub.console.config.security; + +import java.util.Locale; + +import com.fasterxml.jackson.annotation.JsonCreator; + +public enum Privilege { + + CREATE, + DELETE, + GET, + LIST, + UPDATE, + ALL; + + @JsonCreator + public static Privilege forValue(String value) { + if ("*".equals(value)) { + return ALL; + } + return valueOf(value.toUpperCase(Locale.ROOT)); + } + +} diff --git a/common/src/main/java/com/github/streamshub/console/config/security/RoleConfig.java b/common/src/main/java/com/github/streamshub/console/config/security/RoleConfig.java new file mode 100644 index 000000000..39ac83f96 --- /dev/null +++ b/common/src/main/java/com/github/streamshub/console/config/security/RoleConfig.java @@ -0,0 +1,27 @@ +package com.github.streamshub.console.config.security; + +import java.util.ArrayList; +import java.util.List; + +public class RoleConfig { + + private String name; + private List rules = new ArrayList<>(); + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public List getRules() { + return rules; + } + + public void setRules(List rules) { + this.rules = rules; + } + +} diff --git a/common/src/main/java/com/github/streamshub/console/config/security/RuleConfig.java b/common/src/main/java/com/github/streamshub/console/config/security/RuleConfig.java new file mode 100644 index 000000000..5361d1aed --- /dev/null +++ b/common/src/main/java/com/github/streamshub/console/config/security/RuleConfig.java @@ -0,0 +1,47 @@ +package com.github.streamshub.console.config.security; + +import java.util.ArrayList; +import java.util.List; + +public class RuleConfig { + + /** + * Resources to which this rule applies (required) + */ + List resources = new ArrayList<>(); + + /** + * Specific resource names to which this rule applies (optional) + */ + List resourceNames = new ArrayList<>(); + + /** + * Privileges/actions that may be performed for subjects having this rule + */ + List privileges = new ArrayList<>(); + + public List getResources() { + return resources; + } + + public void setResources(List resources) { + this.resources = resources; + } + + public List getResourceNames() { + return resourceNames; + } + + public void setResourceNames(List resourceNames) { + this.resourceNames = resourceNames; + } + + public List getPrivileges() { + return privileges; + } + + public void setPrivileges(List privileges) { + this.privileges = privileges; + } + +} diff --git a/common/src/main/java/com/github/streamshub/console/config/security/SecurityConfig.java b/common/src/main/java/com/github/streamshub/console/config/security/SecurityConfig.java new file mode 100644 index 000000000..050c88f2f --- /dev/null +++ b/common/src/main/java/com/github/streamshub/console/config/security/SecurityConfig.java @@ -0,0 +1,27 @@ +package com.github.streamshub.console.config.security; + +import java.util.ArrayList; +import java.util.List; + +public class SecurityConfig { + + private List subjects = new ArrayList<>(); + private List roles = new ArrayList<>(); + + public List getSubjects() { + return subjects; + } + + public void setSubjects(List subjects) { + this.subjects = subjects; + } + + public List getRoles() { + return roles; + } + + public void setRoles(List roles) { + this.roles = roles; + } + +} diff --git a/common/src/main/java/com/github/streamshub/console/config/security/SubjectConfig.java b/common/src/main/java/com/github/streamshub/console/config/security/SubjectConfig.java new file mode 100644 index 000000000..bb816184c --- /dev/null +++ b/common/src/main/java/com/github/streamshub/console/config/security/SubjectConfig.java @@ -0,0 +1,45 @@ +package com.github.streamshub.console.config.security; + +import java.util.ArrayList; +import java.util.List; + +public class SubjectConfig { + + private String issuer; + private String claim; + private List include = new ArrayList<>(); + private List roleNames = new ArrayList<>(); + + public String getIssuer() { + return issuer; + } + + public void setIssuer(String issuer) { + this.issuer = issuer; + } + + public String getClaim() { + return claim; + } + + public void setClaim(String claim) { + this.claim = claim; + } + + public List getInclude() { + return include; + } + + public void setInclude(List include) { + this.include = include; + } + + public List getRoleNames() { + return roleNames; + } + + public void setRoleNames(List roleNames) { + this.roleNames = roleNames; + } + +} diff --git a/install/004-deploy-dex.sh b/install/004-deploy-dex.sh new file mode 100755 index 000000000..e5550384a --- /dev/null +++ b/install/004-deploy-dex.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +set -euo pipefail + +CONSOLE_INSTALL_PATH="$(cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P)" +RESOURCE_PATH=${CONSOLE_INSTALL_PATH}/resources/dex + +export NAMESPACE="${1?Please provide the deployment namespace}" +export CLUSTER_DOMAIN="${2?Please provide the base domain name for Kafka listener ingress}" + +source ${CONSOLE_INSTALL_PATH}/_common.sh + +${KUBE} create serviceaccount dex --dry-run=client -o yaml \ + | ${KUBE} apply -n ${NAMESPACE} -f - +${KUBE} annotate serviceaccount dex -n ${NAMESPACE} --dry-run=client "serviceaccounts.openshift.io/oauth-redirecturi.dex=https://console-dex.${CLUSTER_DOMAIN}/callback" -o yaml \ + | ${KUBE} apply -n ${NAMESPACE} -f - + +echo -e "${INFO} Creating DEX Credentials" +${KUBE} create secret generic console-dex-secrets -n ${NAMESPACE} \ + --dry-run=client \ + --from-literal=DEX_CLIENT_ID="system:serviceaccount:${NAMESPACE}:dex" \ + --from-literal=DEX_CLIENT_SECRET="$(kubectl create token -n ${NAMESPACE} dex --duration=$((365*24))h)" \ + -o yaml | ${KUBE} apply -n ${NAMESPACE} -f - + +if ${KUBE} get deployment dex -n ${NAMESPACE} 1>/dev/null 2>&1 ; then + ${KUBE} scale --replicas=0 deployment/dex -n ${NAMESPACE} +fi + +# Replace env variables +${YQ} '(.. | select(tag == "!!str")) |= envsubst(ne)' ${RESOURCE_PATH}/dex.yaml | ${KUBE} apply -n ${NAMESPACE} -f - + +if [ "$(${KUBE} api-resources --api-group=route.openshift.io -o=name)" != "" ] ; then + ${KUBE} patch ingress/console-dex-ingress -n ${NAMESPACE} --type=merge --patch '{"spec":{"ingressClassName":"openshift-default"}}' +fi \ No newline at end of file diff --git a/install/_common.sh b/install/_common.sh new file mode 100644 index 000000000..43c1ef3b3 --- /dev/null +++ b/install/_common.sh @@ -0,0 +1,82 @@ +RED='\033[0;31m' +GREEN='\033[0;32m' +NC='\033[0m' # No Color +INFO="[ \033[38;5;33mINFO${NC} ]" +WARN="[ \033[38;5;208mWARN${NC} ]" +ERROR="[ \033[38;5;196mERROR${NC} ]" + +KUBE="$(which oc 2>/dev/null || which kubectl 2>/dev/null)" || : + +if [ "${KUBE}" == "" ] ; then + echo -e "${ERROR} Neither 'oc' or 'kubectl' command line utilities found on the PATH" + exit 1 +fi + +YQ="$(which yq 2>/dev/null)" || : + +if [ "${YQ}" == "" ] ; then + echo -e "${ERROR} 'yq' command line utility found on the PATH" + exit 1 +fi + +if ${KUBE} get namespace/${NAMESPACE} >/dev/null 2>&1 ; then + echo -e "${INFO} Namespace '${NAMESPACE}' exists" +else + echo -e "${WARN} Namespace '${NAMESPACE}' not found... creating" + ${KUBE} create namespace ${NAMESPACE} >/dev/null + + if ${KUBE} get namespace/${NAMESPACE} >/dev/null 2>&1 ; then + echo -e "${INFO} Namespace '${NAMESPACE}' created" + else + echo -e "${WARN} Namespace '${NAMESPACE}' could not be created" + fi +fi + +OLM=$(kubectl get crd | grep operators.coreos.com) || : + +if [ "${OLM}" == "" ] && [ "${CI_CLUSTER}" == "" ] ; then + echo -e "${ERROR} Operator Lifecycle Manager not found, please install it. + +$ operator-sdk olm install + +For more info please visit https://sdk.operatorframework.io/ +" + exit 1 +fi + +function fetch_available_packages { + local NAME_PATTERN="${1}" + + for pm in $(${KUBE} get packagemanifests -o name | grep -E '^packagemanifest\.packages\.operators\.coreos\.com/('"${NAME_PATTERN}"')$') ; do + ${KUBE} get $pm -o yaml | ${YQ} -o=json '{ + "name": .status.packageName, + "channel": .status.defaultChannel, + "catalogSource": .status.catalogSource, + "catalogSourceNamespace": .status.catalogSourceNamespace + }' + done | ${YQ} ea -p=json '[.]' | ${YQ} -o=csv | tail -n +2 +} + +function display_suggested_subscription { + local OPERATOR_NAME="${1}" + local NAME_PATTERN="${2}" + + local AVAILABLE_PKGS="$(fetch_available_packages "${NAME_PATTERN}")" + echo -e "${INFO} ${OPERATOR_NAME} may be installed by creating one of the following resources:" + COUNTER=0 + + while IFS=, read -r PKG_NAME PKG_CHANNEL PKG_CTLG_SRC PKG_CTLG_SRC_NS; do + COUNTER=$(( COUNTER + 1 )) + echo -e "${INFO} ----- Option ${COUNTER} -----" + echo "apiVersion: operators.coreos.com/v1alpha1 +kind: Subscription +metadata: + name: ${OPERATOR_NAME} + namespace: ${NAMESPACE} +spec: + name: ${PKG_NAME} + channel: ${PKG_CHANNEL} + source: ${PKG_CTLG_SRC} + sourceNamespace: ${PKG_CTLG_SRC_NS}" | ${YQ} + done < <(echo "${AVAILABLE_PKGS}") +} diff --git a/install/resources/dex/dex.yaml b/install/resources/dex/dex.yaml new file mode 100644 index 000000000..f0b84600a --- /dev/null +++ b/install/resources/dex/dex.yaml @@ -0,0 +1,175 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: dex +rules: +- apiGroups: ["dex.coreos.com"] # API group created by dex + resources: ["*"] + verbs: ["*"] +- apiGroups: ["apiextensions.k8s.io"] + resources: ["customresourcedefinitions"] + verbs: ["create"] # To manage its own resources, dex must be able to create customresourcedefinitions +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: dex +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: dex +subjects: +- kind: ServiceAccount + name: dex # Service account assigned to the dex pod, created above + namespace: ${NAMESPACE} # The namespace dex is running in +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: dex + name: dex +spec: + replicas: 1 + selector: + matchLabels: + app: dex + template: + metadata: + labels: + app: dex + spec: + serviceAccountName: dex # This is created above + volumes: + - name: config + configMap: + name: dex + items: + - key: config.yaml + path: config.yaml + - name: openshift-ca + configMap: + name: kube-root-ca.crt + items: + - key: ca.crt + path: openshift.pem + #- name: tls + # secret: + # secretName: dex.example.com.tls + containers: + - image: ghcr.io/dexidp/dex:v2.32.0 + name: dex + command: ["/usr/local/bin/dex", "serve", "/etc/dex/cfg/config.yaml"] + + ports: + - name: http + containerPort: 5556 + + volumeMounts: + - name: config + mountPath: /etc/dex/cfg + - name: openshift-ca + mountPath: /etc/ssl/openshift.pem + subPath: openshift.pem + #- name: tls + # mountPath: /etc/dex/tls + + env: + - name: OPENSHIFT_OAUTH_CLIENT_ID + valueFrom: + secretKeyRef: + name: console-dex-secrets + key: DEX_CLIENT_ID + - name: OPENSHIFT_OAUTH_CLIENT_SECRET + valueFrom: + secretKeyRef: + name: console-dex-secrets + key: DEX_CLIENT_SECRET + + readinessProbe: + httpGet: + path: /healthz + port: 5556 + scheme: HTTP +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: dex +data: + config.yaml: | + issuer: https://console-dex.${CLUSTER_DOMAIN} + storage: + type: kubernetes + config: + inCluster: true + web: + http: 0.0.0.0:5556 + #tlsCert: /etc/dex/tls/tls.crt + #tlsKey: /etc/dex/tls/tls.key + connectors: + - type: openshift + id: openshift + name: OpenShift + config: + # OpenShift API + issuer: https://api.crc.testing:6443 + # Credentials can be string literals or pulled from the environment. + clientID: $$OPENSHIFT_OAUTH_CLIENT_ID + clientSecret: $$OPENSHIFT_OAUTH_CLIENT_SECRET + redirectURI: https://console-dex.${CLUSTER_DOMAIN}/callback + # OpenShift root CA + rootCA: /etc/ssl/openshift.pem + # Communicate to OpenShift without validating SSL certificates + insecureCA: false + # Optional list of required groups a user must be a member of + groups: [] + oauth2: + skipApprovalScreen: true + + staticClients: + - id: streamshub-console + name: 'StreamsHub Console' + secret: ZXhhbXBsZS1hcHAtc2VjcmV0 + redirectURIs: + - 'http://127.0.0.1:5555/callback' + - 'http://localhost:3000/api/auth/callback/oidc' +--- +apiVersion: v1 +kind: Service +metadata: + name: dex +spec: + type: ClusterIP + ports: + - name: dex + port: 5556 + protocol: TCP + targetPort: 5556 + selector: + app: dex +--- +kind: Ingress +apiVersion: networking.k8s.io/v1 +metadata: + name: console-dex-ingress + annotations: + nginx.ingress.kubernetes.io/backend-protocol: HTTP + route.openshift.io/termination: edge +spec: + defaultBackend: + service: + name: dex + port: + number: 5556 + rules: + - host: console-dex.${CLUSTER_DOMAIN} + http: + paths: + - pathType: ImplementationSpecific + backend: + service: + name: dex + port: + number: 5556 diff --git a/pom.xml b/pom.xml index 77727c45f..395da9445 100644 --- a/pom.xml +++ b/pom.xml @@ -19,6 +19,7 @@ 3.15.1 0.43.0 0.15.0 + 3.7.1 3.0 @@ -74,6 +75,16 @@ + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.apache.kafka + kafka-metadata + ${kafka.version} + com.nimbusds diff --git a/ui/api/api.ts b/ui/api/api.ts index 52558aa81..1ba939a43 100644 --- a/ui/api/api.ts +++ b/ui/api/api.ts @@ -28,3 +28,8 @@ export const ApiError = z.object({ }) .optional(), }); + +export const ApiErrorResponse = z.object({ + meta: z.object({}).nullable().optional(), + errors: z.array(ApiError), +}); diff --git a/ui/api/kafka/actions.ts b/ui/api/kafka/actions.ts index 916c54d63..2a2c91b76 100644 --- a/ui/api/kafka/actions.ts +++ b/ui/api/kafka/actions.ts @@ -16,6 +16,7 @@ import { PrometheusDriver } from "prometheus-query"; import * as clusterPromql from "./cluster.promql"; import { values } from "./kpi.promql"; import * as topicPromql from "./topic.promql"; +import { ApiErrorResponse } from '@/api/api'; export type ClusterMetric = keyof typeof clusterPromql; export type TopicMetric = keyof typeof topicPromql; @@ -37,17 +38,19 @@ export async function getKafkaClusters(): Promise { const url = `${process.env.BACKEND_URL}/api/kafkas?${kafkaClustersQuery}`; try { const res = await fetch(url, { - headers: { - Accept: "application/json", - "Content-Type": "application/json", - }, + headers: await getHeaders(), next: { revalidate: 30, }, }); const rawData = await res.json(); - log.trace(rawData, "getKafkaClusters response"); - return ClustersResponseSchema.parse(rawData).data; + if (res.status != 200) { + log.info(rawData, "getKafkaClusters response"); + throw new Error(ApiErrorResponse.parse(rawData).errors[0].detail); + } else { + log.trace(rawData, "getKafkaClusters response"); + return ClustersResponseSchema.parse(rawData).data; + } } catch (err) { log.error(err, "getKafkaClusters"); throw new Error("getKafkaClusters: couldn't connect with backend"); diff --git a/ui/app/[locale]/(authorized)/kafka/page.tsx b/ui/app/[locale]/(authorized)/kafka/page.tsx index 892b4f4c5..d18be9638 100644 --- a/ui/app/[locale]/(authorized)/kafka/page.tsx +++ b/ui/app/[locale]/(authorized)/kafka/page.tsx @@ -1,5 +1,3 @@ -import { getKafkaClusters } from "@/api/kafka/actions"; -import { RedirectOnLoad } from "@/components/Navigation/RedirectOnLoad"; import { redirect } from "@/i18n/routing"; export default function Page({}) { diff --git a/ui/app/[locale]/(authorized)/layout.tsx b/ui/app/[locale]/(authorized)/layout.tsx index d2374bd3f..a7d7e232c 100644 --- a/ui/app/[locale]/(authorized)/layout.tsx +++ b/ui/app/[locale]/(authorized)/layout.tsx @@ -2,8 +2,6 @@ import { getAuthOptions } from "@/app/api/auth/[...nextauth]/route"; import { getServerSession } from "next-auth"; import { ReactNode } from "react"; -import { AppLayout } from "../../../components/AppLayout"; -import { AppLayoutProvider } from "../../../components/AppLayoutProvider"; import { AppSessionProvider } from "./AppSessionProvider"; import { SessionRefresher } from "./SessionRefresher"; diff --git a/ui/app/[locale]/(public)/(home)/page.tsx b/ui/app/[locale]/(public)/(home)/page.tsx index afa357e0b..684d1f361 100644 --- a/ui/app/[locale]/(public)/(home)/page.tsx +++ b/ui/app/[locale]/(public)/(home)/page.tsx @@ -30,19 +30,35 @@ import { isProductizedBuild } from "@/utils/env"; import { getTranslations } from "next-intl/server"; import { Suspense } from "react"; import styles from "./home.module.css"; +import config from '@/app/api/config'; +import { logger } from "@/utils/logger"; +import { getAuthOptions } from "@/app/api/auth/[...nextauth]/route"; +import { getServerSession } from "next-auth"; + +const log = logger.child({ module: "home" }); export default async function Home() { const t = await getTranslations(); + log.debug("fetching known Kafka clusters") const allClusters = await getKafkaClusters(); const productName = t("common.product"); const brand = t("common.brand"); + let cfg = await config(); + let oidcCfg = cfg?.security?.oidc; + let username: string | undefined; + + if (oidcCfg) { + const authOptions = await getAuthOptions(); + const session = await getServerSession(authOptions); + username = (session?.user?.name || session?.user?.email) ?? undefined; + } - if (allClusters.length === 1) { + if (allClusters.length === 1 && !oidcCfg) { return ; } return ( - +
@@ -76,8 +92,8 @@ export default async function Home() { - }> - + }> + diff --git a/ui/app/[locale]/layout.tsx b/ui/app/[locale]/layout.tsx index 58a945b8e..7953b5d78 100644 --- a/ui/app/[locale]/layout.tsx +++ b/ui/app/[locale]/layout.tsx @@ -1,5 +1,4 @@ import { getMessages, getTranslations } from "next-intl/server"; -import { useNow, useTimeZone } from "next-intl"; import { ReactNode } from "react"; import NextIntlProvider from "./NextIntlProvider"; import "../globals.css"; @@ -27,7 +26,3 @@ export async function generateMetadata({ title: t("title"), }; } - -// export function generateStaticParams() { -// return [{ locale: "en" }]; -// } diff --git a/ui/app/api/auth/[...nextauth]/anonymous.ts b/ui/app/api/auth/[...nextauth]/anonymous.ts index a8e95b652..55a29c5dd 100644 --- a/ui/app/api/auth/[...nextauth]/anonymous.ts +++ b/ui/app/api/auth/[...nextauth]/anonymous.ts @@ -1,4 +1,3 @@ -import { AuthOptions } from "next-auth"; import CredentialsProvider from "next-auth/providers/credentials"; import { Provider } from "next-auth/providers/index"; diff --git a/ui/app/api/auth/[...nextauth]/keycloak.ts b/ui/app/api/auth/[...nextauth]/keycloak.ts deleted file mode 100644 index 957868765..000000000 --- a/ui/app/api/auth/[...nextauth]/keycloak.ts +++ /dev/null @@ -1,144 +0,0 @@ -import { logger } from "@/utils/logger"; -import { AuthOptions, Session, TokenSet } from "next-auth"; -import { JWT } from "next-auth/jwt"; -import { Provider } from "next-auth/providers/index"; -import KeycloakProvider from "next-auth/providers/keycloak"; - -const log = logger.child({ module: "keycloak" }); - -export function makeOauthProvider( - clientId: string, - clientSecret: string, - issuer: string, -): Provider { - const provider = KeycloakProvider({ - clientId, - clientSecret, - issuer, - }); - - let _tokenEndpoint: string | undefined = undefined; - - async function getTokenEndpoint() { - if (provider && provider.wellKnown) { - const kc = await fetch(provider.wellKnown); - const res = await kc.json(); - _tokenEndpoint = res.token_endpoint; - } - return _tokenEndpoint; - } - - async function refreshToken(token: JWT): Promise { - try { - const tokenEndpoint = await getTokenEndpoint(); - if (!provider) { - log.error("Invalid Keycloak configuratio"); - throw token; - } - if (!tokenEndpoint) { - log.error("Invalid Keycloak wellKnow"); - throw token; - } - let tokenExpiration = new Date( - (typeof token?.expires_at === "number" ? token.expires_at : 0) * 1000, - ); - log.trace({ tokenExpiration }, "Token expiration"); - - if (Date.now() < tokenExpiration.getTime()) { - log.trace(token, "Token not yet expired"); - return token; - } else { - log.trace(token, "Token has expired"); - let refresh_token = - typeof token.refresh_token === "string" ? token.refresh_token : ""; - - const params = { - client_id: provider.options!.clientId, - client_secret: provider.options!.clientSecret, - grant_type: "refresh_token", - refresh_token: refresh_token, - }; - - log.trace( - { - url: tokenEndpoint, - }, - "Refreshing token", - ); - - const response = await fetch(tokenEndpoint, { - headers: { "Content-Type": "application/x-www-form-urlencoded" }, - body: new URLSearchParams(params), - method: "POST", - }); - - const refreshToken: TokenSet = await response.json(); - if (!response.ok) { - throw new Error(response.statusText); - } - log.trace(refreshToken, "Got refresh token"); - - let expires_in = - typeof refreshToken.expires_in === "number" - ? refreshToken.expires_in - : -1; - - const newToken: JWT = { - ...token, // Keep the previous token properties - access_token: refreshToken.access_token, - expires_at: Math.floor(Date.now() / 1000 + expires_in), - // Fall back to old refresh token, but note that - // many providers may only allow using a refresh token once. - refresh_token: refreshToken.refresh_token ?? token.refresh_token, - }; - log.trace(newToken, "New token"); - return newToken; - } - } catch (error: unknown) { - if (typeof error === "string") { - log.error({ message: error }, "Error refreshing access token"); - } else if (error instanceof Error) { - log.error(error, "Error refreshing access token"); - } else { - log.error("Unknown error refreshing access token"); - } - // The error property will be used client-side to handle the refresh token error - return { ...token, error: "RefreshAccessTokenError" as const }; - } - } - - return provider; - - // return { - // providers: [provider], - // callbacks: { - // async jwt({ token, account }: { token: JWT; account: any }) { - // // Persist the OAuth access_token and or the user id to the token right after signin - // if (account) { - // log.trace("account present, saving new token"); - // // Save the access token and refresh token in the JWT on the initial login - // return { - // access_token: account.access_token, - // expires_at: account.expires_at, - // refresh_token: account.refresh_token, - // email: token.email, - // name: token.name, - // picture: token.picture, - // sub: token.sub, - // }; - // } - // - // return refreshToken(token); - // }, - // async session({ session, token }: { session: Session; token: JWT }) { - // // Send properties to the client, like an access_token from a provider. - // log.trace(token, "Creating session from token"); - // return { - // ...session, - // error: token.error, - // accessToken: token.access_token, - // }; - // }, - // }, - // }; -} diff --git a/ui/app/api/auth/[...nextauth]/oidc.ts b/ui/app/api/auth/[...nextauth]/oidc.ts new file mode 100644 index 000000000..eef30f153 --- /dev/null +++ b/ui/app/api/auth/[...nextauth]/oidc.ts @@ -0,0 +1,182 @@ +import { logger } from "@/utils/logger"; +import { Session, TokenSet } from "next-auth"; +import { JWT } from "next-auth/jwt"; +import { OAuthConfig } from "next-auth/providers/index"; +import config from '@/app/api/config'; + +const log = logger.child({ module: "oidc" }); + +class OpenIdConnect { + + provider: OAuthConfig | null; + + constructor( + authServerUrl: string | null, + clientId: string | null, + clientSecret: string | null + ) { + if (clientId && clientSecret && authServerUrl) { + this.provider = { + id: "oidc", + name: "OpenID Connect Provider", + type: "oauth", + clientId: clientId, + clientSecret: clientSecret, + wellKnown: `${authServerUrl}/.well-known/openid-configuration`, + authorization: { params: { scope: "openid email profile groups" } }, + idToken: true, + profile(profile) { + return { + id: profile.sub, + name: profile.name ?? profile.preferred_username, + email: profile.email, + image: profile.image, + } + }, + } + } else { + this.provider = null; + } + } + + isEnabled() { + return this.provider != null; + } + + async getTokenEndpoint() { + let _tokenEndpoint: string | undefined = undefined; + + if (this.provider?.wellKnown) { + log.debug(`wellKnown endpoint: ${this.provider.wellKnown}`); + const kc = await fetch(this.provider.wellKnown); + const res = await kc.json(); + _tokenEndpoint = res.token_endpoint; + } + + log.debug(`token endpoint: ${_tokenEndpoint}`); + + return _tokenEndpoint; + } + + async refreshToken(token: JWT): Promise { + if (this.provider == null) { + throw new Error("OIDC is not properly configured"); + } + + try { + let tokenExpiration = new Date( + (typeof token?.expires_at === "number" ? token.expires_at : 0) * 1000, + ); + log.trace({ tokenExpiration }, "Token expiration"); + + if (Date.now() < tokenExpiration.getTime()) { + log.trace(token, "Token not yet expired"); + return token; + } + log.trace(token, "Token has expired"); + let refresh_token = + typeof token.refresh_token === "string" ? token.refresh_token : ""; + + const params = { + client_id: this.provider.clientId!, + client_secret: this.provider.clientSecret!, + grant_type: "refresh_token", + refresh_token: refresh_token, + }; + + const tokenEndpoint = await this.getTokenEndpoint(); + + if (!tokenEndpoint) { + log.error("Invalid OIDC wellKnown"); + throw token; + } + + log.trace( + { + url: tokenEndpoint, + }, + "Refreshing token", + ); + + const response = await fetch(tokenEndpoint, { + headers: { "Content-Type": "application/x-www-form-urlencoded" }, + body: new URLSearchParams(params), + method: "POST", + }); + + const refreshToken: TokenSet = await response.json(); + if (!response.ok) { + throw new Error(response.statusText); + } + log.trace(refreshToken, "Got refresh token"); + + let expires_in = + typeof refreshToken.expires_in === "number" + ? refreshToken.expires_in + : -1; + + const newToken: JWT = { + ...token, // Keep the previous token properties + access_token: refreshToken.access_token, + expires_at: Math.floor(Date.now() / 1000 + expires_in), + // Fall back to old refresh token, but note that + // many providers may only allow using a refresh token once. + refresh_token: refreshToken.refresh_token ?? token.refresh_token, + }; + log.trace(newToken, "New token"); + return newToken; + } catch (error: unknown) { + if (typeof error === "string") { + log.error({ message: error }, "Error refreshing access token"); + } else if (error instanceof Error) { + log.error(error, "Error refreshing access token"); + } else { + log.error("Unknown error refreshing access token"); + } + // The error property will be used client-side to handle the refresh token error + return { ...token, error: "RefreshAccessTokenError" as const }; + } + } + + async jwt({ token, account }: { token: JWT, account: any }) { + // Persist the OAuth access_token and or the user id to the token right after signin + log.info("jwt callback invoked") + if (account) { + log.trace(`account ${JSON.stringify(account)} present, saving new token: ${JSON.stringify(token)}`); + // Save the access token and refresh token in the JWT on the initial login + return { + access_token: account.access_token, + expires_at: account.expires_at, + refresh_token: account.refresh_token, + email: token.email, + name: token.name, + picture: token.picture, + sub: token.sub, + }; + } + + return this.refreshToken(token); + }; + + async session({ session, token }: { session: Session, token: JWT }) { + // Send properties to the client, like an access_token from a provider. + log.trace(token, "Creating session from token"); + return { + ...session, + error: token.error, + accessToken: token.access_token, + authorization: `Bearer ${token.access_token}`, + }; + }; +} + + +export default async function oidcSource() { + let cfg = await config(); + let oidcCfg = cfg?.security?.oidc; + const authServerUrl: string | null = oidcCfg?.authServerUrl ?? null; + const clientId: string | null = oidcCfg?.clientId ?? null; + const clientSecret: string | null = oidcCfg?.clientSecret ?? null; + const oidcProvider = new OpenIdConnect(authServerUrl, clientId, clientSecret); + return oidcProvider; +}; diff --git a/ui/app/api/auth/[...nextauth]/route.ts b/ui/app/api/auth/[...nextauth]/route.ts index 812db7da4..4d3d88a93 100644 --- a/ui/app/api/auth/[...nextauth]/route.ts +++ b/ui/app/api/auth/[...nextauth]/route.ts @@ -1,37 +1,60 @@ import { getKafkaClusters } from "@/api/kafka/actions"; import { ClusterList } from "@/api/kafka/schema"; import { logger } from "@/utils/logger"; -import NextAuth, { AuthOptions } from "next-auth"; +import NextAuth, { AuthOptions, Session } from "next-auth"; +import { JWT } from "next-auth/jwt"; import { Provider } from "next-auth/providers/index"; import { NextRequest, NextResponse } from "next/server"; import { makeAnonymous } from "./anonymous"; import { makeOauthTokenProvider } from "./oauth-token"; import { makeScramShaProvider } from "./scram"; +import oidcSource from "./oidc"; const log = logger.child({ module: "auth" }); export async function getAuthOptions(): Promise { - // retrieve the authentication method required by the default Kafka cluster - const clusters = await getKafkaClusters(); - const providers = clusters.map(makeAuthOption); - log.trace({ providers }, "getAuthOptions"); - return { - providers, - callbacks: { - async jwt({ token, user }) { - if (user) { - token.authorization = user.authorization; + let providers: Provider[]; + log.info("fetching the oidcSource"); + let oidc = await oidcSource(); + + if (oidc.isEnabled()) { + log.info("OIDC is enabled"); + providers = [ oidc.provider! ]; + return { + providers, + callbacks: { + async jwt({ token, account }: { token: JWT, account: any }) { + return oidc.jwt({ token, account }); + }, + async session({ session, token }: { session: Session, token: JWT }) { + return oidc.session({ session, token }); } - return token; - }, - async session({ session, token, user }) { - // Send properties to the client, like an access_token and user id from a provider. - session.authorization = token.authorization; + } + } + } else { + log.info("OIDC is disabled"); + // retrieve the authentication method required by the default Kafka cluster + const clusters = await getKafkaClusters(); + providers = clusters.map(makeAuthOption); + log.trace({ providers }, "getAuthOptions"); + return { + providers, + callbacks: { + async jwt({ token, user }) { + if (user) { + token.authorization = user.authorization; + } + return token; + }, + async session({ session, token, user }) { + // Send properties to the client, like an access_token and user id from a provider. + session.authorization = token.authorization; - return session; + return session; + }, }, - }, - }; + }; + } } function makeAuthOption(cluster: ClusterList): Provider { diff --git a/ui/app/api/auth/[...nextauth]/scram.ts b/ui/app/api/auth/[...nextauth]/scram.ts index 3bcbd5eb0..b24e6f8c9 100644 --- a/ui/app/api/auth/[...nextauth]/scram.ts +++ b/ui/app/api/auth/[...nextauth]/scram.ts @@ -1,5 +1,3 @@ -import { getKafkaClusters } from "@/api/kafka/actions"; -import { AuthOptions } from "next-auth"; import CredentialsProvider from "next-auth/providers/credentials"; import { Provider } from "next-auth/providers/index"; diff --git a/ui/app/api/config.ts b/ui/app/api/config.ts new file mode 100644 index 000000000..fc07a95ec --- /dev/null +++ b/ui/app/api/config.ts @@ -0,0 +1,21 @@ +"use server"; + +import * as yaml from 'js-yaml'; + +export interface OidcConfig { + authServerUrl: string | null; + clientId: string | null; + clientSecret: string | null; +} + +export interface GlobalSecurityConfig { + oidc: OidcConfig | null; +} + +export interface ConsoleConfig { + security: GlobalSecurityConfig | null; +} + +export default async function config(): Promise { + return yaml.load(process.env.CONSOLE_CONFIG!) as ConsoleConfig; +} diff --git a/ui/components/ClustersTable.tsx b/ui/components/ClustersTable.tsx index 9fc35d60c..103d737a3 100644 --- a/ui/components/ClustersTable.tsx +++ b/ui/components/ClustersTable.tsx @@ -6,6 +6,7 @@ import { ResponsiveTable } from "@/components/Table"; import { Truncate } from "@/libs/patternfly/react-core"; import { TableVariant } from "@/libs/patternfly/react-table"; import { useTranslations } from "next-intl"; +import { Link } from "@/i18n/routing"; const columns = [ "name", @@ -13,14 +14,28 @@ const columns = [ "namespace", "authentication", "login", -] as const; +]; export function ClustersTable({ clusters, + authenticated, }: { clusters: ClusterList[] | undefined; + authenticated: boolean }) { const t = useTranslations(); + const columns = authenticated ? [ + "name", + "version", + "namespace", + ] as const : [ + "name", + "version", + "namespace", + "authentication", + "login", + ] as const; + return ( - + {authenticated + ? + + + : + } ); case "version": @@ -87,8 +107,8 @@ export function ClustersTable({ case "login": return ( - - Login to cluster + + { authenticated ? "View" : "Login to cluster" } ); diff --git a/ui/environment.d.ts b/ui/environment.d.ts index 06950d380..e6d03d648 100644 --- a/ui/environment.d.ts +++ b/ui/environment.d.ts @@ -3,9 +3,6 @@ namespace NodeJS { NEXTAUTH_URL: string; NEXTAUTH_SECRET: string; BACKEND_URL: string; - KEYCLOAK_CLIENTID?: string; - KEYCLOAK_CLIENTSECRET?: string; - NEXT_PUBLIC_KEYCLOAK_URL?: string; NEXT_PUBLIC_PRODUCTIZED_BUILD?: "true" | "false"; CONSOLE_METRICS_PROMETHEUS_URL?: string; LOG_LEVEL?: "fatal" | "error" | "warn" | "info" | "debug" | "trace"; diff --git a/ui/middleware.ts b/ui/middleware.ts index 8b5909822..a0b46a936 100644 --- a/ui/middleware.ts +++ b/ui/middleware.ts @@ -3,6 +3,7 @@ import withAuth from "next-auth/middleware"; import createIntlMiddleware from "next-intl/middleware"; import { NextRequest, NextResponse } from "next/server"; +import consoleConfig from '@/app/api/config'; import { logger } from "@/utils/logger"; const log = logger.child({ module: "middleware" }); @@ -24,7 +25,7 @@ const authMiddleware = withAuth( authorized: ({ token }) => token != null, }, pages: { - signIn: `/kafka/1/login`, + //signIn: `/kafka/1/login`, }, }, ) as any; @@ -44,16 +45,20 @@ const protectedPathnameRegex = RegExp( ); export default async function middleware(req: NextRequest) { + let cfg = await consoleConfig(); + let oidcCfg = cfg?.['security']?.['oidc']; + let oidcEnabled = oidcCfg ? true : false; + const requestPath = req.nextUrl.pathname; - const isPublicPage = publicPathnameRegex.test(requestPath); - const isProtectedPage = protectedPathnameRegex.test(requestPath); + const isPublicPage = !oidcEnabled && publicPathnameRegex.test(requestPath); + const isProtectedPage = oidcEnabled || protectedPathnameRegex.test(requestPath); if (isPublicPage) { log.trace({ requestPath: requestPath }, "public page"); return intlMiddleware(req); } else if (isProtectedPage) { log.trace({ requestPath: requestPath }, "protected page"); - return (authMiddleware as any)(req); + return (authMiddleware)(req); } else { log.debug( { diff --git a/ui/package-lock.json b/ui/package-lock.json index d1223f0c4..9208ed3c7 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -36,6 +36,7 @@ "eslint-import-resolver-typescript": "^3.6.3", "eslint-plugin-storybook": "^0.10.1", "iron-session": "^8.0.3", + "js-yaml": "^4.1.0", "lodash.groupby": "^4.6.0", "next": "^14.2.15", "next-auth": "^4.24.8", @@ -2502,11 +2503,6 @@ "url": "https://opencollective.com/eslint" } }, - "node_modules/@eslint/eslintrc/node_modules/argparse": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz", - "integrity": "sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==" - }, "node_modules/@eslint/eslintrc/node_modules/globals": { "version": "13.24.0", "resolved": "https://registry.npmjs.org/globals/-/globals-13.24.0.tgz", @@ -2521,17 +2517,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/@eslint/eslintrc/node_modules/js-yaml": { - "version": "4.1.0", - "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz", - "integrity": "sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA==", - "dependencies": { - "argparse": "^2.0.1" - }, - "bin": { - "js-yaml": "bin/js-yaml.js" - } - }, "node_modules/@eslint/eslintrc/node_modules/type-fest": { "version": "0.20.2", "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", @@ -3150,6 +3135,15 @@ "node": ">=8" } }, + "node_modules/@istanbuljs/load-nyc-config/node_modules/argparse": { + "version": "1.0.10", + "resolved": "https://registry.npmjs.org/argparse/-/argparse-1.0.10.tgz", + "integrity": "sha512-o5Roy6tNG4SL/FOkCAN6RzjiakZS25RLYFrcMttJqbdd8BWrnA+fGz57iN5Pb06pvBGvl5gQ0B48dJlslXvoTg==", + "dev": true, + "dependencies": { + "sprintf-js": "~1.0.2" + } + }, "node_modules/@istanbuljs/load-nyc-config/node_modules/find-up": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/find-up/-/find-up-4.1.0.tgz", @@ -3163,6 +3157,19 @@ "node": ">=8" } }, + "node_modules/@istanbuljs/load-nyc-config/node_modules/js-yaml": { + "version": "3.14.1", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.1.tgz", + "integrity": "sha512-okMH7OXXJ7YrN9Ok3/SXrnu4iX9yOk+25nqX4imS2npuvTYDmo/QEZoqwZkYaIDk3jVvBOTOIEgEhaLOynBS9g==", + "dev": true, + "dependencies": { + "argparse": "^1.0.7", + "esprima": "^4.0.0" + }, + "bin": { + "js-yaml": "bin/js-yaml.js" + } + }, "node_modules/@istanbuljs/load-nyc-config/node_modules/locate-path": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/locate-path/-/locate-path-5.0.0.tgz", @@ -8808,13 +8815,9 @@ "dev": true }, "node_modules/argparse": { - "version": "1.0.10", - "resolved": "https://registry.npmjs.org/argparse/-/argparse-1.0.10.tgz", - "integrity": "sha512-o5Roy6tNG4SL/FOkCAN6RzjiakZS25RLYFrcMttJqbdd8BWrnA+fGz57iN5Pb06pvBGvl5gQ0B48dJlslXvoTg==", - "dev": true, - "dependencies": { - "sprintf-js": "~1.0.2" - } + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz", + "integrity": "sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==" }, "node_modules/aria-query": { "version": "5.3.0", @@ -11879,11 +11882,6 @@ "url": "https://opencollective.com/eslint" } }, - "node_modules/eslint/node_modules/argparse": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz", - "integrity": "sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==" - }, "node_modules/eslint/node_modules/globals": { "version": "13.24.0", "resolved": "https://registry.npmjs.org/globals/-/globals-13.24.0.tgz", @@ -11898,17 +11896,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/eslint/node_modules/js-yaml": { - "version": "4.1.0", - "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz", - "integrity": "sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA==", - "dependencies": { - "argparse": "^2.0.1" - }, - "bin": { - "js-yaml": "bin/js-yaml.js" - } - }, "node_modules/eslint/node_modules/type-fest": { "version": "0.20.2", "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", @@ -15616,13 +15603,12 @@ "integrity": "sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==" }, "node_modules/js-yaml": { - "version": "3.14.1", - "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.1.tgz", - "integrity": "sha512-okMH7OXXJ7YrN9Ok3/SXrnu4iX9yOk+25nqX4imS2npuvTYDmo/QEZoqwZkYaIDk3jVvBOTOIEgEhaLOynBS9g==", - "dev": true, + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz", + "integrity": "sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA==", + "license": "MIT", "dependencies": { - "argparse": "^1.0.7", - "esprima": "^4.0.0" + "argparse": "^2.0.1" }, "bin": { "js-yaml": "bin/js-yaml.js" @@ -17507,12 +17493,6 @@ } } }, - "node_modules/postcss-loader/node_modules/argparse": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz", - "integrity": "sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==", - "dev": true - }, "node_modules/postcss-loader/node_modules/cosmiconfig": { "version": "9.0.0", "resolved": "https://registry.npmjs.org/cosmiconfig/-/cosmiconfig-9.0.0.tgz", @@ -17539,18 +17519,6 @@ } } }, - "node_modules/postcss-loader/node_modules/js-yaml": { - "version": "4.1.0", - "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz", - "integrity": "sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA==", - "dev": true, - "dependencies": { - "argparse": "^2.0.1" - }, - "bin": { - "js-yaml": "bin/js-yaml.js" - } - }, "node_modules/postcss-loader/node_modules/semver": { "version": "7.6.2", "resolved": "https://registry.npmjs.org/semver/-/semver-7.6.2.tgz", diff --git a/ui/package.json b/ui/package.json index 8b564ff4e..cc76cf824 100644 --- a/ui/package.json +++ b/ui/package.json @@ -41,6 +41,7 @@ "eslint-import-resolver-typescript": "^3.6.3", "eslint-plugin-storybook": "^0.10.1", "iron-session": "^8.0.3", + "js-yaml": "^4.1.0", "lodash.groupby": "^4.6.0", "next": "^14.2.15", "next-auth": "^4.24.8", diff --git a/ui/utils/env.ts b/ui/utils/env.ts index c413be6c4..8c716b2bd 100644 --- a/ui/utils/env.ts +++ b/ui/utils/env.ts @@ -3,14 +3,6 @@ export const isReadonly = (() => { return true; } - if ( - process.env.NEXT_PUBLIC_KEYCLOAK_URL && - process.env.KEYCLOAK_CLIENTID && - process.env.KEYCLOAK_CLIENTSECRET - ) { - return false; - } - return true; })();