Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract ManagedChannel Creation #195

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
nbactions.xml
src/site/markdown/*.html
target/
.idea
*.iml
61 changes: 61 additions & 0 deletions src/main/java/org/microbean/helm/DefaultManagedChannelFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.microbean.helm;

import io.fabric8.kubernetes.client.LocalPortForward;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.net.InetAddress;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.microbean.development.annotation.Issue;

/**
* A default implementation of the {@link ManagedChannelFactory} that creates a {@link ManagedChannel}
* from a {@link LocalPortForward}.
*
* Allows additional customization of the {@link ManagedChannelBuilder} by supplying a
* {@link ManagedChannelConfigurer} to the constructor.
*/
public class DefaultManagedChannelFactory implements ManagedChannelFactory {

private static final ManagedChannelConfigurer NOOP_CONFIGURER = (builder) -> {};
private final ManagedChannelConfigurer configurer;

public DefaultManagedChannelFactory() {
this.configurer = NOOP_CONFIGURER;
}

/**
* @param configurer a {@link ManagedChannelConfigurer} to allow overriding of the default
* configuration of the {@link ManagedChannel}
*
* @throws NullPointerException if the configurer is null
*/
public DefaultManagedChannelFactory(final ManagedChannelConfigurer configurer) {
Objects.requireNonNull(configurer);
this.configurer = configurer;
}

@Issue(id = "42", uri = "https://github.com/microbean/microbean-helm/issues/42")
@Override public ManagedChannel create(final LocalPortForward portForward) {
Objects.requireNonNull(portForward);
@Issue(id = "43", uri = "https://github.com/microbean/microbean-helm/issues/43")
final InetAddress localAddress = portForward.getLocalAddress();
if (localAddress == null) {
throw new IllegalArgumentException("portForward",
new IllegalStateException("portForward.getLocalAddress() == null"));
}
final String hostAddress = localAddress.getHostAddress();
if (hostAddress == null) {
throw new IllegalArgumentException("portForward",
new IllegalStateException("portForward.getLocalAddress().getHostAddress() == null"));
}
final ManagedChannelBuilder builder =
ManagedChannelBuilder.forAddress(hostAddress, portForward.getLocalPort())
.idleTimeout(5L, TimeUnit.SECONDS)
.keepAliveTime(30L, TimeUnit.SECONDS)
.maxInboundMessageSize(Tiller.MAX_MESSAGE_SIZE)
.usePlaintext(true);
configurer.configure(builder);
return builder.build();
}
}
11 changes: 11 additions & 0 deletions src/main/java/org/microbean/helm/ManagedChannelConfigurer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.microbean.helm;

import io.grpc.ManagedChannelBuilder;

/**
* An interface whose implementations configure options on the supplied
* {@link ManagedChannelBuilder} to override defaults.
*/
public interface ManagedChannelConfigurer {
void configure(final ManagedChannelBuilder<?> managedChannelBuilder);
}
32 changes: 32 additions & 0 deletions src/main/java/org/microbean/helm/ManagedChannelFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.microbean.helm;

import io.fabric8.kubernetes.client.LocalPortForward;
import io.grpc.ManagedChannel;

/**
* An interface whose implementations create a {@link ManagedChannel} from a
* {@link LocalPortForward} to be used to communicate with Tiller.
*/
public interface ManagedChannelFactory {

/**
* Creates a {@link ManagedChannel} for communication with Tiller
* from the information contained in the supplied {@link
* LocalPortForward}.
*
* <p>This method never returns {@code null}.</p>
*
* <p>Overrides of this method must not return {@code null}.</p>
*
* @param portForward a {@link LocalPortForward}; must not be {@code
* null}
* @return a non-{@code null} {@link ManagedChannel}
* @throws NullPointerException if {@code portForward} is {@code
* null}
* @throws IllegalArgumentException if {@code portForward}'s
* {@link LocalPortForward#getLocalAddress()} method returns {@code
* null}
*/
ManagedChannel create(final LocalPortForward portForward);

}
133 changes: 68 additions & 65 deletions src/main/java/org/microbean/helm/Tiller.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@
import java.io.Closeable;
import java.io.IOException;

import java.net.InetAddress;
import java.net.MalformedURLException;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import java.util.concurrent.TimeUnit;

import hapi.services.tiller.ReleaseServiceGrpc;
import hapi.services.tiller.ReleaseServiceGrpc.ReleaseServiceBlockingStub;
import hapi.services.tiller.ReleaseServiceGrpc.ReleaseServiceFutureStub;
Expand All @@ -46,7 +43,6 @@
import io.fabric8.kubernetes.client.LocalPortForward;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;

import io.grpc.health.v1.HealthGrpc;
Expand All @@ -58,8 +54,6 @@

import okhttp3.OkHttpClient;

import org.microbean.development.annotation.Issue;

import org.microbean.kubernetes.Pods;

/**
Expand Down Expand Up @@ -113,7 +107,7 @@ public class Tiller implements ConfigAware<Config>, Closeable {
* be.
*/
public static final int MAX_MESSAGE_SIZE = 20 * 1024 * 1024;

/**
* A {@link Metadata} that ensures that certain Tiller-related
* headers are passed with every gRPC call.
Expand All @@ -122,11 +116,10 @@ public class Tiller implements ConfigAware<Config>, Closeable {
*/
private static final Metadata metadata = new Metadata();


/*
* Static initializer.
*/


/**
* Static initializer; initializes the {@link #DEFAULT_LABELS}
Expand Down Expand Up @@ -173,7 +166,6 @@ public class Tiller implements ConfigAware<Config>, Closeable {
*/
private final ManagedChannel channel;


/*
* Constructors.
*/
Expand Down Expand Up @@ -209,11 +201,30 @@ public Tiller(final ManagedChannel channel) {
* null}
*/
public Tiller(final LocalPortForward portForward) {
this(portForward, new DefaultManagedChannelFactory());
}


/**
* Creates a new {@link Tiller} that will use information from the
* supplied {@link LocalPortForward} to establish a communications
* channel with the Tiller server.
*
* @param portForward the {@link LocalPortForward} to use; must not
* be {@code null}
*
* @param managedChannelFactory the {@link ManagedChannelFactory} that will be used to create a
* {@link ManagedChannel} to communicate with Tiller
*
* @exception NullPointerException if {@code portForward} is {@code
* null}
*/
public Tiller(final LocalPortForward portForward, final ManagedChannelFactory managedChannelFactory) {
super();
Objects.requireNonNull(portForward);
this.config = null;
this.portForward = null; // yes, null
this.channel = this.buildChannel(portForward);
this.channel = managedChannelFactory.create(portForward);
}

/**
Expand Down Expand Up @@ -244,7 +255,41 @@ public Tiller(final LocalPortForward portForward) {
* @exception NullPointerException if {@code client} is {@code null}
*/
public <T extends HttpClientAware & KubernetesClient> Tiller(final T client) throws MalformedURLException {
this(client, DEFAULT_NAMESPACE, DEFAULT_PORT, DEFAULT_LABELS);
this(client, DEFAULT_NAMESPACE, DEFAULT_PORT, DEFAULT_LABELS, new DefaultManagedChannelFactory());
}

/**
* Creates a new {@link Tiller} that will forward a local port to
* port {@code 44134} on a Pod housing Tiller in the {@code
* kube-system} namespace running in the Kubernetes cluster with
* which the supplied {@link KubernetesClient} is capable of
* communicating.
*
* <p>The {@linkplain Pods#getFirstReadyPod(Listable) first ready
* Pod} with a {@code name} label whose value is {@code tiller} and
* with an {@code app} label whose value is {@code helm} is deemed
* to be the pod housing the Tiller instance to connect to. (This
* duplicates the default logic of the {@code helm} command line
* executable.)</p>
*
* @param <T> a {@link KubernetesClient} implementation that is also
* an {@link HttpClientAware} implementation, such as {@link
* DefaultKubernetesClient}
*
* @param client the {@link KubernetesClient}-and-{@link
* HttpClientAware} implementation that can communicate with a
* Kubernetes cluster; must not be {@code null}
*
* @param managedChannelFactory the {@link ManagedChannelFactory} that will be used to create a
* {@link ManagedChannel} to communicate with Tiller
*
* @exception MalformedURLException if there was a problem
* identifying a Pod within the cluster that houses a Tiller instance
*
* @exception NullPointerException if {@code client} is {@code null}
*/
public <T extends HttpClientAware & KubernetesClient> Tiller(final T client, final ManagedChannelFactory managedChannelFactory) throws MalformedURLException {
this(client, DEFAULT_NAMESPACE, DEFAULT_PORT, DEFAULT_LABELS, managedChannelFactory);
}

/**
Expand Down Expand Up @@ -285,7 +330,7 @@ public <T extends HttpClientAware & KubernetesClient> Tiller(final T client) thr
* found and consequently a connection could not be established
*/
public <T extends HttpClientAware & KubernetesClient> Tiller(final T client, final String namespaceHousingTiller) throws MalformedURLException {
this(client, namespaceHousingTiller, DEFAULT_PORT, DEFAULT_LABELS);
this(client, namespaceHousingTiller, DEFAULT_PORT, DEFAULT_LABELS, new DefaultManagedChannelFactory());
}

/**
Expand Down Expand Up @@ -320,6 +365,9 @@ public <T extends HttpClientAware & KubernetesClient> Tiller(final T client, fin
* instance; if {@code null} then the value of {@link
* #DEFAULT_LABELS} will be used instead
*
* @param managedChannelFactory the {@link ManagedChannelFactory} that will be used to create a
* {@link ManagedChannel} to communicate with Tiller
*
* @exception MalformedURLException if there was a problem
* identifying a Pod within the cluster that houses a Tiller instance
*
Expand All @@ -334,7 +382,8 @@ public <T extends HttpClientAware & KubernetesClient> Tiller(final T client, fin
public <T extends HttpClientAware & KubernetesClient> Tiller(final T client,
String namespaceHousingTiller,
int tillerPort,
Map<String, String> tillerLabels) throws MalformedURLException {
Map<String, String> tillerLabels,
ManagedChannelFactory managedChannelFactory) throws MalformedURLException {
super();
Objects.requireNonNull(client);
this.config = client.getConfiguration();
Expand All @@ -352,20 +401,19 @@ public <T extends HttpClientAware & KubernetesClient> Tiller(final T client,
throw new IllegalArgumentException("client", new IllegalStateException("client.getHttpClient() == null"));
}
LocalPortForward portForward = null;

this.portForward = Pods.forwardPort(httpClient, client.pods().inNamespace(namespaceHousingTiller).withLabels(tillerLabels), tillerPort);
if (this.portForward == null) {
throw new TillerException("Could not forward port to a Ready Tiller pod's port " + tillerPort + " in namespace " + namespaceHousingTiller + " with labels " + tillerLabels);
}
this.channel = this.buildChannel(this.portForward);
this.channel = managedChannelFactory.create(this.portForward);
}


/*
* Instance methods.
*/


/**
* Returns any {@link Config} available at construction time.
*
Expand All @@ -377,51 +425,6 @@ public <T extends HttpClientAware & KubernetesClient> Tiller(final T client,
public Config getConfiguration() {
return this.config;
}


/**
* Creates a {@link ManagedChannel} for communication with Tiller
* from the information contained in the supplied {@link
* LocalPortForward}.
*
* <p><strong>Note:</strong> This method is (deliberately) called
* from constructors so must have stateless semantics.</p>
*
* <p>This method never returns {@code null}.</p>
*
* <p>Overrides of this method must not return {@code null}.</p>
*
* @param portForward a {@link LocalPortForward}; must not be {@code
* null}
*
* @return a non-{@code null} {@link ManagedChannel}
*
* @exception NullPointerException if {@code portForward} is {@code
* null}
*
* @exception IllegalArgumentException if {@code portForward}'s
* {@link LocalPortForward#getLocalAddress()} method returns {@code
* null}
*/
@Issue(id = "42", uri = "https://github.com/microbean/microbean-helm/issues/42")
protected ManagedChannel buildChannel(final LocalPortForward portForward) {
Objects.requireNonNull(portForward);
@Issue(id = "43", uri = "https://github.com/microbean/microbean-helm/issues/43")
final InetAddress localAddress = portForward.getLocalAddress();
if (localAddress == null) {
throw new IllegalArgumentException("portForward", new IllegalStateException("portForward.getLocalAddress() == null"));
}
final String hostAddress = localAddress.getHostAddress();
if (hostAddress == null) {
throw new IllegalArgumentException("portForward", new IllegalStateException("portForward.getLocalAddress().getHostAddress() == null"));
}
return ManagedChannelBuilder.forAddress(hostAddress, portForward.getLocalPort())
.idleTimeout(5L, TimeUnit.SECONDS)
.keepAliveTime(30L, TimeUnit.SECONDS)
.maxInboundMessageSize(MAX_MESSAGE_SIZE)
.usePlaintext(true)
.build();
}

/**
* Closes this {@link Tiller} after use; any {@link
Expand Down Expand Up @@ -498,7 +501,7 @@ public ReleaseServiceFutureStub getReleaseServiceFutureStub() {
* @return a non-{@code null} {@link ReleaseServiceStub}
*
* @see ReleaseServiceStub
*/
*/
public ReleaseServiceStub getReleaseServiceStub() {
ReleaseServiceStub returnValue = null;
if (this.channel != null) {
Expand All @@ -522,7 +525,7 @@ public HealthFutureStub getHealthFutureStub() {
}
return returnValue;
}

public HealthStub getHealthStub() {
HealthStub returnValue = null;
if (this.channel != null) {
Expand All @@ -538,5 +541,5 @@ public VersionOrBuilder getVersion() throws IOException {
assert response != null;
return response.getVersion();
}

}
2 changes: 1 addition & 1 deletion src/main/java/org/microbean/helm/TillerInstaller.java
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ protected final <T extends HttpClientAware & KubernetesClient> void ping(String
throw new TillerPollingDeadlineExceededException(String.valueOf(timeoutInMilliseconds));
}
@SuppressWarnings("unchecked")
final Tiller tiller = new Tiller((T)this.kubernetesClient, namespace, -1 /* use default */, labels);
final Tiller tiller = new Tiller((T)this.kubernetesClient, namespace, -1 /* use default */, labels, new DefaultManagedChannelFactory());
final HealthBlockingStub health = tiller.getHealthBlockingStub();
assert health != null;
final HealthCheckRequest.Builder builder = HealthCheckRequest.newBuilder();
Expand Down