Skip to content

Commit

Permalink
fix worker&fuse options & worker tiredstore (#3344)
Browse files Browse the repository at this point in the history
* fix worker&fuse options & worker tiredstore

Signed-off-by: zwwhdls <[email protected]>

* fix staticcheck

Signed-off-by: zwwhdls <[email protected]>

* fix dataset capacity & send deprecated event

Signed-off-by: zwwhdls <[email protected]>

* fix typo & add unit test

Signed-off-by: zwwhdls <[email protected]>

* fix worker metric port

Signed-off-by: zwwhdls <[email protected]>

* fix static check

Signed-off-by: zwwhdls <[email protected]>

* fix ready worker num in cache capacity

Signed-off-by: zwwhdls <[email protected]>

---------

Signed-off-by: zwwhdls <[email protected]>
  • Loading branch information
zwwhdls authored Jul 24, 2023
1 parent 5413868 commit 9f2926f
Show file tree
Hide file tree
Showing 12 changed files with 479 additions and 130 deletions.
4 changes: 4 additions & 0 deletions api/v1alpha1/juicefsruntime_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ type JuiceFSFuseSpec struct {
// Resources that will be requested by JuiceFS Fuse.
Resources corev1.ResourceRequirements `json:"resources,omitempty"`

// Options mount options that fuse pod will use
// +optional
Options map[string]string `json:"options,omitempty"`

// If the fuse client should be deployed in global mode,
// otherwise the affinity should be considered
// +optional
Expand Down
16 changes: 16 additions & 0 deletions api/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions charts/fluid/fluid/crds/data.fluid.io_juicefsruntimes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ spec:
the fuse client to fit on a node, this option only effect when
global is enabled
type: object
options:
additionalProperties:
type: string
description: Options mount options that fuse pod will use
type: object
podMetadata:
description: PodMetadata defines labels and annotations that will
be propagated to JuiceFs's pods.
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/data.fluid.io_juicefsruntimes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ spec:
the fuse client to fit on a node, this option only effect when
global is enabled
type: object
options:
additionalProperties:
type: string
description: Options mount options that fuse pod will use
type: object
podMetadata:
description: PodMetadata defines labels and annotations that will
be propagated to JuiceFs's pods.
Expand Down
28 changes: 21 additions & 7 deletions pkg/ddc/juicefs/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,39 @@ import (
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"

"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils"
v1 "k8s.io/api/core/v1"
)

// queryCacheStatus checks the cache status
func (j *JuiceFSEngine) queryCacheStatus() (states cacheStates, err error) {
edition := j.GetEdition()

var cachesize uint64
if len(j.runtime.Spec.TieredStore.Levels) != 0 {
cachesize, e := strconv.ParseUint(strconv.FormatInt(j.runtime.Spec.TieredStore.Levels[0].Quota.Value(), 10), 10, 64)
if e != nil {
err = e
cachesize, err = strconv.ParseUint(strconv.FormatInt(j.runtime.Spec.TieredStore.Levels[0].Quota.Value(), 10), 10, 64)
if err != nil {
return
}
}
// if cacheSize is overwritten in worker options, deprecated
if cacheSizeStr := j.runtime.Spec.Worker.Options["cache-size"]; cacheSizeStr != "" {
var cacheSizeMB uint64
cacheSizeMB, err = strconv.ParseUint(cacheSizeStr, 10, 64)
if err != nil {
return
}
states.cacheCapacity = utils.BytesSize(float64(cachesize * uint64(j.runtime.Spec.Replicas)))

// cacheSize is in MiB
cachesize = cacheSizeMB * 1024 * 1024
}
if cachesize != 0 {
states.cacheCapacity = utils.BytesSize(float64(cachesize * uint64(j.runtime.Status.WorkerNumberReady)))
}

var pods []v1.Pod
var pods []corev1.Pod
// enterprise edition use cache of workers which form a cache group, while community edition use cache of fuse pod whose cache if no-sharing
containerName := common.JuiceFSWorkerContainer
stsName := j.getWorkerName()
Expand Down Expand Up @@ -93,7 +107,7 @@ func (j *JuiceFSEngine) queryCacheStatus() (states cacheStates, err error) {
// get cacheHitRatio & cacheThroughputRatio from fuse pod
func (j *JuiceFSEngine) getCacheRatio(edition string, states *cacheStates) (err error) {
var containerName string
var pods []v1.Pod
var pods []corev1.Pod
if edition == EnterpriseEdition {
containerName = common.JuiceFSWorkerContainer
stsName := j.getWorkerName()
Expand Down
11 changes: 10 additions & 1 deletion pkg/ddc/juicefs/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,19 @@ func TestJuiceFSEngine_queryCacheStatus(t *testing.T) {
Name: "test",
Namespace: "fluid",
},
Spec: datav1alpha1.JuiceFSRuntimeSpec{
Replicas: 1,
Worker: datav1alpha1.JuiceFSCompTemplateSpec{Options: map[string]string{
"cache-size": "102400",
}},
},
Status: datav1alpha1.RuntimeStatus{
WorkerNumberReady: 1,
},
},
}
want := cacheStates{
cacheCapacity: "",
cacheCapacity: "100.00GiB",
cached: "37.96GiB",
cachedPercentage: "0.0%",
cacheHitRatio: "100.0%",
Expand Down
3 changes: 3 additions & 0 deletions pkg/ddc/juicefs/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"

"github.com/fluid-cloudnative/fluid/pkg/ctrl"
"github.com/fluid-cloudnative/fluid/pkg/utils"
Expand All @@ -40,6 +41,7 @@ type JuiceFSEngine struct {
runtimeType string
Log logr.Logger
client.Client
Recorder record.EventRecorder
//When reaching this gracefulShutdownLimits, the system is forced to clean up.
gracefulShutdownLimits int32
MetadataSyncDoneCh chan base.MetadataSyncResult
Expand All @@ -54,6 +56,7 @@ func Build(id string, ctx cruntime.ReconcileRequestContext) (base.Engine, error)
name: ctx.Name,
namespace: ctx.Namespace,
Client: ctx.Client,
Recorder: ctx.Recorder,
Log: ctx.Log,
runtimeType: ctx.RuntimeType,
gracefulShutdownLimits: 5,
Expand Down
87 changes: 81 additions & 6 deletions pkg/ddc/juicefs/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package juicefs

import (
"fmt"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -74,7 +75,7 @@ func (j *JuiceFSEngine) transform(runtime *datav1alpha1.JuiceFSRuntime) (value *
}

// transform the workers
err = j.transformWorkers(runtime, value)
err = j.transformWorkers(runtime, dataset, value)
if err != nil {
return
}
Expand All @@ -90,7 +91,7 @@ func (j *JuiceFSEngine) transform(runtime *datav1alpha1.JuiceFSRuntime) (value *
return
}

func (j *JuiceFSEngine) transformWorkers(runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (err error) {
func (j *JuiceFSEngine) transformWorkers(runtime *datav1alpha1.JuiceFSRuntime, dataset *datav1alpha1.Dataset, value *JuiceFS) (err error) {

image := runtime.Spec.JuiceFSVersion.Image
imageTag := runtime.Spec.JuiceFSVersion.ImageTag
Expand All @@ -106,6 +107,31 @@ func (j *JuiceFSEngine) transformWorkers(runtime *datav1alpha1.JuiceFSRuntime, v
value.Worker.NodeSelector = runtime.Spec.Worker.NodeSelector
}

// options
mount := dataset.Spec.Mounts[0]
var tieredStoreLevel *datav1alpha1.Level
if len(runtime.Spec.TieredStore.Levels) != 0 {
tieredStoreLevel = &runtime.Spec.TieredStore.Levels[0]
}
option, err := j.genMountOptions(mount, tieredStoreLevel)
if err != nil {
return err
}
for k, v := range runtime.Spec.Worker.Options {
option[k] = v
}
if runtime.Spec.Worker.Options["cache-size"] != "" || runtime.Spec.Worker.Options["cache-dir"] != "" {
// cache-size & cache-dir in worker.options will be deprecated in the future
// send an event in runtime
msg := "cache-size & cache-dir in worker.options will be deprecated in the future, please use tieredStore.levels instead"
j.Log.Info(msg)
j.Recorder.Eventf(runtime, corev1.EventTypeWarning, common.RuntimeDeprecated, msg)
}

// transform mount cmd & stat cmd
j.genWorkerMount(value, option)

// transform resources for worker
err = j.transformResourcesForWorker(runtime, value)
if err != nil {
j.Log.Error(err, "failed to transform resource for worker")
Expand All @@ -129,6 +155,55 @@ func (j *JuiceFSEngine) transformWorkers(runtime *datav1alpha1.JuiceFSRuntime, v
return
}

// genMount: generate mount args
func (j *JuiceFSEngine) genWorkerMount(value *JuiceFS, workerOptionMap map[string]string) {
var mountArgsWorker []string
if workerOptionMap == nil {
workerOptionMap = map[string]string{}
}
runtimeInfo := j.runtimeInfo
if runtimeInfo != nil {
accessModes, err := utils.GetAccessModesOfDataset(j.Client, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
if err != nil {
j.Log.Info("Error:", "err", err)
}
if len(accessModes) > 0 {
for _, mode := range accessModes {
if mode == corev1.ReadOnlyMany {
workerOptionMap["ro"] = ""
break
}
}
}
}
if value.Edition == CommunityEdition {
if _, ok := workerOptionMap["metrics"]; !ok {
metricsPort := DefaultMetricsPort
if value.Worker.MetricsPort != nil {
metricsPort = *value.Worker.MetricsPort
}
workerOptionMap["metrics"] = fmt.Sprintf("0.0.0.0:%d", metricsPort)
}
mountArgsWorker = []string{common.JuiceFSCeMountPath, value.Source, value.Worker.MountPath, "-o", strings.Join(genArgs(workerOptionMap), ",")}
} else {
workerOptionMap["foreground"] = ""

// start independent cache cluster, refer to [juicefs cache sharing](https://juicefs.com/docs/cloud/cache/#client_cache_sharing)
// fuse and worker use the same cache-group, fuse use no-sharing
cacheGroup := fmt.Sprintf("%s-%s", j.namespace, value.FullnameOverride)
if _, ok := workerOptionMap["cache-group"]; ok {
cacheGroup = workerOptionMap["cache-group"]
}
workerOptionMap["cache-group"] = cacheGroup
delete(workerOptionMap, "no-sharing")

mountArgsWorker = []string{common.JuiceFSMountPath, value.Source, value.Worker.MountPath, "-o", strings.Join(genArgs(workerOptionMap), ",")}
}

value.Worker.Command = strings.Join(mountArgsWorker, " ")
value.Worker.StatCmd = "stat -c %i " + value.Worker.MountPath
}

func (j *JuiceFSEngine) transformPlacementMode(dataset *datav1alpha1.Dataset, value *JuiceFS) {
value.PlacementMode = string(dataset.Spec.PlacementMode)
if len(value.PlacementMode) == 0 {
Expand Down Expand Up @@ -164,13 +239,13 @@ func (j *JuiceFSEngine) allocatePorts(dataset *datav1alpha1.Dataset, runtime *da
// enterprise edition do not need metrics port
return nil
}
fuseMetricsPort, err := GetMetricsPort(dataset.Spec.Mounts[0].Options)
fuseMetricsPort, err := GetMetricsPort(runtime.Spec.Fuse.Options)
if err != nil {
return err
}
workerMetricsPort := DefaultMetricsPort
if runtime.Spec.Worker.Options == nil {
workerMetricsPort = fuseMetricsPort
workerMetricsPort, err := GetMetricsPort(runtime.Spec.Worker.Options)
if err != nil {
return err
}

// if not use hostnetwork then use default port
Expand Down
Loading

0 comments on commit 9f2926f

Please sign in to comment.