Skip to content

Commit

Permalink
Add ingress-class-name controller flag (#1482)
Browse files Browse the repository at this point in the history
* Add ingress-class-name controller flag

Add a flag to set ingressClassName field for ingress
objects created for Spark UI.
This will make ingress compliant with Kubernetes >v1.19 and
better utilizing multiple ingress controllers

* Update Helm chart to v1.1.20/appVersion 1.3.4

Include ingressClassName changes

Co-authored-by: Hristo Voyvodov <[email protected]>
  • Loading branch information
voyvodov and Hristo Voyvodov authored Apr 11, 2022
1 parent 55732a6 commit 3b58b26
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 21 deletions.
4 changes: 2 additions & 2 deletions charts/spark-operator-chart/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
apiVersion: v2
name: spark-operator
description: A Helm chart for Spark on Kubernetes operator
version: 1.1.19
appVersion: v1beta2-1.3.3-3.1.1
version: 1.1.20
appVersion: v1beta2-1.3.4-3.1.1
keywords:
- spark
home: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var (
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressClassName = flag.String("ingress-class-name", "", "Set ingressClassName for ingress resources created.")
metricsLabels util.ArrayFlags
metricsJobStartLatencyBuckets util.HistogramBuckets = util.DefaultJobStartLatencyBuckets
)
Expand Down Expand Up @@ -184,7 +185,7 @@ func main() {
}

applicationController := sparkapplication.NewController(
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, batchSchedulerMgr, *enableUIService)
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, *ingressClassName, batchSchedulerMgr, *enableUIService)
scheduledApplicationController := scheduledsparkapplication.NewController(
crClient, kubeClient, apiExtensionsClient, crInformerFactory, clock.RealClock{})

Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Controller struct {
applicationLister crdlisters.SparkApplicationLister
podLister v1.PodLister
ingressURLFormat string
ingressClassName string
batchSchedulerMgr *batchscheduler.SchedulerManager
enableUIService bool
}
Expand All @@ -89,6 +90,7 @@ func NewController(
metricsConfig *util.MetricConfig,
namespace string,
ingressURLFormat string,
ingressClassName string,
batchSchedulerMgr *batchscheduler.SchedulerManager,
enableUIService bool) *Controller {
crdscheme.AddToScheme(scheme.Scheme)
Expand All @@ -100,7 +102,7 @@ func NewController(
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "spark-operator"})

return newSparkApplicationController(crdClient, kubeClient, crdInformerFactory, podInformerFactory, recorder, metricsConfig, ingressURLFormat, batchSchedulerMgr, enableUIService)
return newSparkApplicationController(crdClient, kubeClient, crdInformerFactory, podInformerFactory, recorder, metricsConfig, ingressURLFormat, ingressClassName, batchSchedulerMgr, enableUIService)
}

func newSparkApplicationController(
Expand All @@ -111,6 +113,7 @@ func newSparkApplicationController(
eventRecorder record.EventRecorder,
metricsConfig *util.MetricConfig,
ingressURLFormat string,
ingressClassName string,
batchSchedulerMgr *batchscheduler.SchedulerManager,
enableUIService bool) *Controller {
queue := workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(queueTokenRefillRate), queueTokenBucketSize)},
Expand All @@ -122,6 +125,7 @@ func newSparkApplicationController(
recorder: eventRecorder,
queue: queue,
ingressURLFormat: ingressURLFormat,
ingressClassName: ingressClassName,
batchSchedulerMgr: batchSchedulerMgr,
enableUIService: enableUIService,
}
Expand Down Expand Up @@ -687,7 +691,7 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be
app.Spec.SparkConf["spark.ui.proxyBase"] = ingressURL.Path
app.Spec.SparkConf["spark.ui.proxyRedirectUri"] = "/"
}
ingress, err := createSparkUIIngress(app, *service, ingressURL, c.kubeClient)
ingress, err := createSparkUIIngress(app, *service, ingressURL, c.ingressClassName, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
Expand Down
48 changes: 47 additions & 1 deletion pkg/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newFakeController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Cont

podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second)
controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder,
&util.MetricConfig{}, "", nil, true)
&util.MetricConfig{}, "", "", nil, true)

informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
if app != nil {
Expand Down Expand Up @@ -1619,6 +1619,52 @@ func TestIngressWithSubpathAffectsSparkConfiguration(t *testing.T) {
}
}

func TestIngressWithClassName(t *testing.T) {
os.Setenv(kubernetesServiceHostEnvVar, "localhost")
os.Setenv(kubernetesServicePortEnvVar, "443")

appName := "ingressaffectssparkconfig"

app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: "test",
},
Spec: v1beta2.SparkApplicationSpec{
RestartPolicy: v1beta2.RestartPolicy{
Type: v1beta2.Never,
},
TimeToLiveSeconds: int64ptr(1),
},
Status: v1beta2.SparkApplicationStatus{},
}

ctrl, _ := newFakeController(app)
ctrl.ingressURLFormat = "{{$appNamespace}}.{{$appName}}.example.com"
ctrl.ingressClassName = "nginx"
ctrl.enableUIService = true
_, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
_, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
ingresses, err := ctrl.kubeClient.NetworkingV1().Ingresses(app.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
if ingresses == nil || ingresses.Items == nil || len(ingresses.Items) != 1 {
t.Fatal("The ingress does not exist, has no items, or wrong amount of items")
}
if ingresses.Items[0].Spec.IngressClassName == nil || *ingresses.Items[0].Spec.IngressClassName != "nginx" {
t.Fatal("The ingressClassName does not exists, or wrong value is set")
}
}

func stringptr(s string) *string {
return &s
}
Expand Down
28 changes: 17 additions & 11 deletions pkg/controller/sparkapplication/sparkui.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,22 @@ type SparkService struct {

// SparkIngress encapsulates information about the driver UI ingress.
type SparkIngress struct {
ingressName string
ingressURL *url.URL
annotations map[string]string
ingressTLS []networkingv1.IngressTLS
ingressName string
ingressURL *url.URL
ingressClassName string
annotations map[string]string
ingressTLS []networkingv1.IngressTLS
}

func createSparkUIIngress(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, kubeClient clientset.Interface) (*SparkIngress, error) {
func createSparkUIIngress(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, ingressClassName string, kubeClient clientset.Interface) (*SparkIngress, error) {
if util.IngressCapabilities.Has("networking.k8s.io/v1") {
return createSparkUIIngress_v1(app, service, ingressURL, kubeClient)
return createSparkUIIngress_v1(app, service, ingressURL, ingressClassName, kubeClient)
} else {
return createSparkUIIngress_legacy(app, service, ingressURL, kubeClient)
}
}

func createSparkUIIngress_v1(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, kubeClient clientset.Interface) (*SparkIngress, error) {
func createSparkUIIngress_v1(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, ingressClassName string, kubeClient clientset.Interface) (*SparkIngress, error) {
ingressResourceAnnotations := getIngressResourceAnnotations(app)
ingressTlsHosts := getIngressTlsHosts(app)

Expand Down Expand Up @@ -145,16 +146,21 @@ func createSparkUIIngress_v1(app *v1beta2.SparkApplication, service SparkService
if len(ingressTlsHosts) != 0 {
ingress.Spec.TLS = ingressTlsHosts
}
if len(ingressClassName) != 0 {
ingress.Spec.IngressClassName = &ingressClassName
}

glog.Infof("Creating an Ingress %s for the Spark UI for application %s", ingress.Name, app.Name)
_, err := kubeClient.NetworkingV1().Ingresses(ingress.Namespace).Create(context.TODO(), &ingress, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return &SparkIngress{
ingressName: ingress.Name,
ingressURL: ingressURL,
annotations: ingress.Annotations,
ingressTLS: ingressTlsHosts,
ingressName: ingress.Name,
ingressURL: ingressURL,
ingressClassName: ingressClassName,
annotations: ingress.Annotations,
ingressTLS: ingressTlsHosts,
}, nil
}

Expand Down
25 changes: 21 additions & 4 deletions pkg/controller/sparkapplication/sparkui_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
expectError bool
}

testFn := func(test testcase, t *testing.T, ingressURLFormat string) {
testFn := func(test testcase, t *testing.T, ingressURLFormat string, ingressClassName string) {
fakeClient := fake.NewSimpleClientset()
sparkService, err := createSparkUIService(test.app, fakeClient)
if err != nil {
Expand All @@ -340,7 +340,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
if err != nil {
t.Fatal(err)
}
sparkIngress, err := createSparkUIIngress(test.app, *sparkService, ingressURL, fakeClient)
sparkIngress, err := createSparkUIIngress(test.app, *sparkService, ingressURL, ingressClassName, fakeClient)
if err != nil {
if test.expectError {
return
Expand Down Expand Up @@ -492,6 +492,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
},
},
}

testcases := []testcase{
{
name: "simple ingress object",
Expand Down Expand Up @@ -550,7 +551,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
}

for _, test := range testcases {
testFn(test, t, "{{$appName}}.ingress.clusterName.com")
testFn(test, t, "{{$appName}}.ingress.clusterName.com", "")
}

testcases = []testcase{
Expand All @@ -569,7 +570,23 @@ func TestCreateSparkUIIngress(t *testing.T) {
}

for _, test := range testcases {
testFn(test, t, "ingress.clusterName.com/{{$appNamespace}}/{{$appName}}")
testFn(test, t, "ingress.clusterName.com/{{$appNamespace}}/{{$appName}}", "")
}

testcases = []testcase{
{
name: "simple ingress object with ingressClassName set",
app: app1,
expectedIngress: SparkIngress{
ingressName: fmt.Sprintf("%s-ui-ingress", app1.GetName()),
ingressURL: parseURLAndAssertError(app1.GetName()+".ingress.clusterName.com", t),
ingressClassName: "nginx",
},
expectError: false,
},
}
for _, test := range testcases {
testFn(test, t, "{{$appName}}.ingress.clusterName.com", "nginx")
}
}

Expand Down

0 comments on commit 3b58b26

Please sign in to comment.