Skip to content

Commit

Permalink
move validation of storage.scheme and storage.baseDir must be set whe…
Browse files Browse the repository at this point in the history
…n upgradeMode is not stateless to helpers

Signed-off-by: Casper Thygesen <[email protected]>
  • Loading branch information
cthtrifork committed Apr 30, 2024
1 parent 0d01fad commit f08d1f4
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 133 deletions.
36 changes: 36 additions & 0 deletions charts/flink-job/ci/bad-upgrademode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# example values from docs.cheetah.trifork.dev/
image:
repository: flink
tag: 1.16
ingress:
enabled: true
hostname: flink.cheetah.trifork.dev
annotations:
cert-manager.io/cluster-issuer: letsencrypt
tlsSecret: letsencrypt


version: v1_16
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
parallelism: 2
upgradeMode: savepoint
topics:
- name: test
type: input
postfix: postfix
taskManager:
replicas: 2
resource:
memory: 2Gb
cpu: 0.5
jobManager:
replicas: 2

flinkConfiguration:
state.backend: "rocksdb"
restart-strategy.failure-rate.failure-rate-interval: "6 min"

metrics:
enabled: true
3 changes: 3 additions & 0 deletions charts/flink-job/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ Validate the configuration
*/}}
{{- define "flink-job.storageConfiguration" -}}
{{- $configs := .configs -}}
{{- if and (not (eq .global.job.upgradeMode "stateless")) (not (and .global.storage.scheme .global.storage.baseDir)) -}}
{{- fail "storage.scheme and storage.baseDir must be set when upgradeMode is not stateless" -}}
{{- end -}}
{{- if and .global.storage.scheme .global.storage.baseDir (has .global.job.upgradeMode (list "last-state" "savepoint")) -}}
{{- $checkpointsDir := printf "%s://%s/%s/checkpoints" (trimSuffix "://" .global.storage.scheme) .global.storage.baseDir .fullname -}}
{{- $configs = fromJson (include "flink-job._dictSet" (list $configs "state.checkpoints.dir" $checkpointsDir)) -}}
Expand Down
133 changes: 0 additions & 133 deletions charts/flink-job/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,139 +3,6 @@
"title": "JSON schema for flink-job/values.yaml",
"type": "object",
"additionalProperties": false,
"allOf": [
{
"if": {
"properties": {
"job": {
"properties": {
"upgradeMode": {
"const": "savepoint"
}
}
}
}
},
"then": {
"anyOf": [
{
"properties": {
"flinkConfiguration": {
"required": [
"state.savepoints.dir"
]
}
}
},
{
"properties": {
"storage": {
"type": "object",
"properties": {
"scheme": {
"type": "string",
"minLength": 1
},
"baseDir": {
"type": "string",
"minLength": 1
}
}
}
}
}
]
}
},
{
"if": {
"properties": {
"job": {
"properties": {
"upgradeMode": {
"const": "last-state"
}
}
}
}
},
"then": {
"anyOf": [
{
"properties": {
"flinkConfiguration": {
"required": [
"state.checkpoints.dir",
"high-availability.storageDir"
]
}
}
},
{
"properties": {
"storage": {
"type": "object",
"properties": {
"scheme": {
"type": "string",
"minLength": 1
},
"baseDir": {
"type": "string",
"minLength": 1
}
}
}
}
}
]
}
},
{
"if":
{
"properties": {
"jobManager": {
"properties": {
"replicas": {
"exclusiveMinimum": 1
}
}
}
}
},
"then": {
"anyOf": [
{
"properties": {
"flinkConfiguration": {
"required": [
"high-availability.storageDir"
]
}
}
},
{
"properties": {
"storage": {
"type": "object",
"properties": {
"scheme": {
"type": "string",
"minLength": 1
},
"baseDir": {
"type": "string",
"minLength": 1
}
}
}
}
}
]
}
}
],
"properties": {
"nameOverride": {
"type": "string"
Expand Down

0 comments on commit f08d1f4

Please sign in to comment.