Skip to content

Commit

Permalink
Merge pull request #227 from 0xff-dev/main
Browse files Browse the repository at this point in the history
fix: rename bucket name, and add missing permission
  • Loading branch information
bjwswang authored Nov 19, 2023
2 parents 43a07b2 + 7c67494 commit 149be08
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 13 deletions.
6 changes: 4 additions & 2 deletions api/v1alpha1/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ type ConditionReason string
const (
ReasonAvailable ConditionReason = "Available"
ReasonUnavailable ConditionReason = "Unavailable"
ReasonCreating ConditionReason = "Creating"
ReasonDeleting ConditionReason = "Deleting"
ReasonReconcileSuccess ConditionReason = "ReconcileSuccess"
ReasonReconcileError ConditionReason = "ReconcileError"
ReasonReconcilePaused ConditionReason = "ReconcilePaused"

ReasonFileSyncing ConditionReason = "FileSyncing"
ReasonFileSyncFailed ConditionReason = "FileSyncFailed"
ReasonFileSuncSuccess ConditionReason = "FileSyncSuccess"
)

// Some Data related Condition Types
Expand Down
35 changes: 34 additions & 1 deletion api/v1alpha1/versioneddataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"context"
"fmt"
"sort"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
Expand Down Expand Up @@ -50,7 +53,10 @@ func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileS
// 2. Organize the contents of the fileGroup into this format: {"datasourceNamespace datasourceName": ["file1", "file2"]}
fileGroup := make(map[string][]string)
for _, fg := range instance.Spec.FileGroups {
namespace := fg.Datasource.GetNamespace()
namespace := instance.Namespace
if fg.Datasource.Namespace != nil {
namespace = *fg.Datasource.Namespace
}
key := fmt.Sprintf("%s %s", namespace, fg.Datasource.Name)
if _, ok := fileGroup[key]; !ok {
fileGroup[key] = make([]string, 0)
Expand Down Expand Up @@ -127,7 +133,31 @@ func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileS
return targetDatasourceFileStatus[i].DatasourceName < targetDatasourceFileStatus[j].DatasourceName
})

index := -1
for idx, item := range instance.Status.Conditions {
if item.Type == TypeReady {
if item.Status != corev1.ConditionTrue {
index = idx
}
break
}
}
if len(instance.Status.Conditions) == 0 || index != -1 {
message := "sync files."
if index != -1 {
message = "file synchronization failed, try again"
}
cond := Condition{
Type: TypeReady,
Status: corev1.ConditionFalse,
Reason: ReasonFileSyncing,
Message: message,
LastTransitionTime: v1.Now(),
}
instance.Status.ConditionedStatus.SetConditions(cond)
}
instance.Status.DatasourceFiles = targetDatasourceFileStatus
// update condition to sync
return update, deletedFiles
}

Expand Down Expand Up @@ -155,6 +185,9 @@ func UpdateFileStatus(ctx context.Context, instance *VersionedDataset, datasourc
if syncStatus == FileProcessPhaseFailed {
instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].ErrMessage = errMsg
}
if syncStatus == FileProcessPhaseSucceeded {
instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].LastUpdateTime = v1.Now()
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ rules:
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
Expand Down
1 change: 1 addition & 0 deletions controllers/dataset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type DatasetReconciler struct {
}

//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=datasets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=versioneddatasets,verbs=deletecollection
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=datasets/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=datasets/finalizers,verbs=update

Expand Down
8 changes: 6 additions & 2 deletions controllers/versioneddataset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,14 @@ func (r *VersionedDatasetReconciler) preUpdate(ctx context.Context, instance *v1
}

dataset := &v1alpha1.Dataset{}
namespace := instance.Namespace
if instance.Spec.Dataset.Namespace != nil {
namespace = *instance.Spec.Dataset.Namespace
}
if err = r.Client.Get(ctx, types.NamespacedName{
Namespace: instance.Spec.Dataset.GetNamespace(),
Namespace: namespace,
Name: instance.Spec.Dataset.Name}, dataset); err != nil {
klog.Errorf("preUpdate: failed to get dataset %s/%s, error %s", instance.Spec.Dataset.GetNamespace(), instance.Spec.Dataset.Name, err)
klog.Errorf("preUpdate: failed to get dataset %s/%s, error %s", namespace, instance.Spec.Dataset.Name, err)
return false, err
}

Expand Down
2 changes: 1 addition & 1 deletion deploy/charts/arcadia/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: v2
name: arcadia
description: A Helm chart(KubeBB Component) for KubeAGI Arcadia
type: application
version: 0.1.25
version: 0.1.26
appVersion: "0.0.1"

keywords:
Expand Down
1 change: 1 addition & 0 deletions deploy/charts/arcadia/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ rules:
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
Expand Down
20 changes: 13 additions & 7 deletions pkg/scheduler/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,23 @@ func (e *executor) generateJob(ctx context.Context, jobCh chan<- JobPayload, dat

ds := &v1alpha1.Datasource{}
datasourceNamespace := fs.DatasourceNamespace
datasetNamespace := e.instance.Spec.Dataset.GetNamespace()
srcBucket := "bucket-" + datasourceNamespace
dstBucket := "bucket-" + datasetNamespace
klog.V(4).Infof("[Debug] datasourceNamespace: %s, datasetNamespace: %s, srcBucket: %s, dstBucket: %s",
datasourceNamespace, datasetNamespace, srcBucket, dstBucket)

datasetNamespace := e.instance.Namespace
if e.instance.Spec.Dataset.Namespace != nil {
datasetNamespace = *e.instance.Spec.Dataset.Namespace
}
if err := e.client.Get(ctx, types.NamespacedName{Namespace: fs.DatasourceNamespace, Name: fs.DatasourceName}, ds); err != nil {
klog.Errorf("generateJob: failed to get datasource %s", err)
return err
}

srcBucket := datasourceNamespace
if ds.Spec.OSS != nil {
srcBucket = ds.Spec.OSS.Bucket
}
dstBucket := datasetNamespace
klog.V(4).Infof("[Debug] datasourceNamespace: %s, datasetNamespace: %s, srcBucket: %s, dstBucket: %s",
datasourceNamespace, datasetNamespace, srcBucket, dstBucket)

klog.V(5).Infof("[Debug] get datasource %+v\n", *ds)

oss, err := datasource.NewOSS(ctx, e.client, ds.Spec.Enpoint)
Expand All @@ -92,7 +98,7 @@ func (e *executor) generateJob(ctx context.Context, jobCh chan<- JobPayload, dat
}
}

dst := fmt.Sprintf("%s/dataset/%s/%s", datasetNamespace, e.instance.Spec.Dataset.Name, e.instance.Spec.Version)
dst := fmt.Sprintf("dataset/%s/%s", e.instance.Spec.Dataset.Name, e.instance.Spec.Version)

for _, fp := range fs.Status {
select {
Expand Down
34 changes: 34 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package scheduler

import (
"context"
"fmt"

"github.com/KawashiroNitori/butcher/v2"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -69,8 +72,39 @@ func (s *Scheduler) Start() error {
}

if ds.ResourceVersion == s.ds.ResourceVersion {
syncCond := true
for _, cond := range ds.Status.Conditions {
if cond.Type == v1alpha1.TypeReady && cond.Status == corev1.ConditionTrue && cond.Reason == v1alpha1.ReasonFileSuncSuccess {
syncCond = false
}
}
deepCopy := ds.DeepCopy()
deepCopy.Status.DatasourceFiles = s.ds.Status.DatasourceFiles
if syncCond {
condition := v1alpha1.Condition{
Type: v1alpha1.TypeReady,
Status: corev1.ConditionTrue,
LastTransitionTime: v1.Now(),
Reason: v1alpha1.ReasonFileSuncSuccess,
Message: "",
}
for _, checker := range s.ds.Status.DatasourceFiles {
shouldBreak := false
for _, f := range checker.Status {
if f.Phase != v1alpha1.FileProcessPhaseSucceeded {
condition.Status = corev1.ConditionFalse
condition.Reason = v1alpha1.ReasonFileSyncFailed
condition.Message = fmt.Sprintf("%s sync failed, %s", f.Path, f.ErrMessage)
shouldBreak = true
}
}
if shouldBreak {
break
}
}
deepCopy.Status.ConditionedStatus.SetConditions(condition)
}

return s.client.Status().Patch(s.ctx, deepCopy, client.MergeFrom(ds))
}

Expand Down

0 comments on commit 149be08

Please sign in to comment.