diff --git a/api/v1alpha1/versioneddataset.go b/api/v1alpha1/versioneddataset.go index e8852bc7b..92192f2d2 100644 --- a/api/v1alpha1/versioneddataset.go +++ b/api/v1alpha1/versioneddataset.go @@ -20,9 +20,14 @@ import ( "context" "fmt" "sort" + "strings" + "github.com/minio/minio-go/v7" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + + "github.com/kubeagi/arcadia/pkg/utils/minioutils" ) var ( @@ -30,26 +35,37 @@ var ( LabelVersionedDatasetVersionOwner = Group + "/owner" ) -// CopyedFileGroup2Status the function will eventually return, whether there are new files added. and a list of files that were deleted. -func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileStatus) { - if instance.DeletionTimestamp != nil { - source := instance.Status.DatasourceFiles - instance.Status.DatasourceFiles = nil - return true, source +const InheritedFromVersionName = "inheritfrom-" + +func generateInheriedFileStatus(minioClient *minio.Client, instance *VersionedDataset) []FileStatus { + if instance.Spec.InheritedFrom == "" { + return nil } - // 1. First store the information about the status of the file that has been saved in the current status. - oldDatasourceFiles := make(map[string]map[string]FileDetails) - for _, fileStatus := range instance.Status.DatasourceFiles { - key := fmt.Sprintf("%s %s", fileStatus.DatasourceNamespace, fileStatus.DatasourceName) - if _, ok := oldDatasourceFiles[key]; !ok { - oldDatasourceFiles[key] = make(map[string]FileDetails) - } - for _, item := range fileStatus.Status { - oldDatasourceFiles[key][item.Path] = item + srcBucket := instance.Spec.Dataset.Namespace + prefix := fmt.Sprintf("dataset/%s/%s/", instance.Spec.Dataset.Name, instance.Spec.InheritedFrom) + filePaths := minioutils.ListObjects(context.TODO(), *srcBucket, prefix, minioClient, -1) + status := make([]FileDetails, len(filePaths)) + sort.Strings(filePaths) + + for idx, fp := range filePaths { + status[idx] = FileDetails{ + Path: strings.TrimPrefix(fp, prefix), + Phase: FileProcessPhaseProcessing, } } + return []FileStatus{ + { + TypedObjectReference: TypedObjectReference{ + Name: InheritedFromVersionName + instance.Spec.InheritedFrom, + Namespace: &instance.Namespace, + Kind: "VersionedDataset", + }, + Status: status, + }} +} +func generateDatasourceFileStatus(instance *VersionedDataset) []FileStatus { // 2. Organize the contents of the fileGroup into this format: {"datasourceNamespace datasourceName": ["file1", "file2"]} fileGroup := make(map[string][]string) for _, fg := range instance.Spec.FileGroups { @@ -65,14 +81,17 @@ func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileS } // 3. Convert fileGroup to []DatasourceFileStatus format - targetDatasourceFileStatus := make([]DatasourceFileStatus, 0) + targetDatasourceFileStatus := make([]FileStatus, 0) var namespace, name string for datasource, filePaths := range fileGroup { _, _ = fmt.Sscanf(datasource, "%s %s", &namespace, &name) - item := DatasourceFileStatus{ - DatasourceName: name, - DatasourceNamespace: namespace, - Status: []FileDetails{}, + item := FileStatus{ + TypedObjectReference: TypedObjectReference{ + Name: name, + Namespace: &namespace, + Kind: "Datasource", + }, + Status: []FileDetails{}, } for _, fp := range filePaths { item.Status = append(item.Status, FileDetails{ @@ -86,15 +105,40 @@ func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileS targetDatasourceFileStatus = append(targetDatasourceFileStatus, item) } + return targetDatasourceFileStatus +} + +// CopyedFileGroup2Status the function will eventually return, whether there are new files added. and a list of files that were deleted. +func CopyedFileGroup2Status(minioClient *minio.Client, instance *VersionedDataset) (bool, []FileStatus) { + if instance.DeletionTimestamp != nil { + source := instance.Status.Files + instance.Status.Files = nil + return true, source + } + + // 1. First store the information about the status of the file that has been saved in the current status. + oldDatasourceFiles := make(map[string]map[string]FileDetails) + for _, fileStatus := range instance.Status.Files { + key := fmt.Sprintf("%s %s", *fileStatus.Namespace, fileStatus.Name) + if _, ok := oldDatasourceFiles[key]; !ok { + oldDatasourceFiles[key] = make(map[string]FileDetails) + } + for _, item := range fileStatus.Status { + oldDatasourceFiles[key][item.Path] = item + } + } + + targetDatasourceFileStatus := generateDatasourceFileStatus(instance) + targetDatasourceFileStatus = append(targetDatasourceFileStatus, generateInheriedFileStatus(minioClient, instance)...) // 4. If a file from a data source is found to exist in oldDatasourceFiles, // replace it with the book inside oldDatasourceFiles. // Otherwise set the file as being processed. update := false - deletedFiles := make([]DatasourceFileStatus, 0) + deletedFiles := make([]FileStatus, 0) for idx := range targetDatasourceFileStatus { item := targetDatasourceFileStatus[idx] - key := fmt.Sprintf("%s %s", item.DatasourceNamespace, item.DatasourceName) + key := fmt.Sprintf("%s %s", *item.Namespace, item.Name) // if the datasource itself is not in status, then it is a new series of files added. datasourceFiles, ok := oldDatasourceFiles[key] @@ -116,10 +160,12 @@ func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileS delete(datasourceFiles, status.Path) } if len(datasourceFiles) > 0 { - ds := DatasourceFileStatus{ - DatasourceName: item.DatasourceName, - DatasourceNamespace: item.DatasourceNamespace, - Status: make([]FileDetails, 0), + ds := FileStatus{ + TypedObjectReference: TypedObjectReference{ + Name: item.Name, + Namespace: item.Namespace, + }, + Status: make([]FileDetails, 0), } for _, r := range datasourceFiles { ds.Status = append(ds.Status, r) @@ -127,10 +173,27 @@ func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileS deletedFiles = append(deletedFiles, ds) } targetDatasourceFileStatus[idx] = item + delete(oldDatasourceFiles, key) + } + + for key, item := range oldDatasourceFiles { + var namespace, name string + fmt.Sscanf(key, "%s %s", &namespace, &name) + ds := FileStatus{ + TypedObjectReference: TypedObjectReference{ + Name: name, + Namespace: &namespace, + }, + Status: make([]FileDetails, 0), + } + for _, r := range item { + ds.Status = append(ds.Status, r) + } + deletedFiles = append(deletedFiles, ds) } sort.Slice(targetDatasourceFileStatus, func(i, j int) bool { - return targetDatasourceFileStatus[i].DatasourceName < targetDatasourceFileStatus[j].DatasourceName + return targetDatasourceFileStatus[i].Name < targetDatasourceFileStatus[j].Name }) index := -1 @@ -156,37 +219,39 @@ func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileS } instance.Status.ConditionedStatus.SetConditions(cond) } - instance.Status.DatasourceFiles = targetDatasourceFileStatus + klog.V(4).Infof("[Debug] delete filestatus %+v\n", deletedFiles) + + instance.Status.Files = targetDatasourceFileStatus // update condition to sync return update, deletedFiles } func UpdateFileStatus(ctx context.Context, instance *VersionedDataset, datasource, srcPath string, syncStatus FileProcessPhase, errMsg string) error { - datasourceFileLen := len(instance.Status.DatasourceFiles) + datasourceFileLen := len(instance.Status.Files) datasourceIndex := sort.Search(datasourceFileLen, func(i int) bool { - return instance.Status.DatasourceFiles[i].DatasourceName >= datasource + return instance.Status.Files[i].Name >= datasource }) if datasourceIndex == datasourceFileLen { return fmt.Errorf("not found datasource %s in %s/%s.status", datasource, instance.Namespace, instance.Name) } - filePathLen := len(instance.Status.DatasourceFiles[datasourceIndex].Status) + filePathLen := len(instance.Status.Files[datasourceIndex].Status) fileIndex := sort.Search(filePathLen, func(i int) bool { - return instance.Status.DatasourceFiles[datasourceIndex].Status[i].Path >= srcPath + return instance.Status.Files[datasourceIndex].Status[i].Path >= srcPath }) if fileIndex == filePathLen { return fmt.Errorf("not found srcPath %s in datasource %s", srcPath, datasource) } // Only this state transfer is allowed - curPhase := instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].Phase + curPhase := instance.Status.Files[datasourceIndex].Status[fileIndex].Phase if curPhase == FileProcessPhaseProcessing && (syncStatus == FileProcessPhaseSucceeded || syncStatus == FileProcessPhaseFailed) { - instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].Phase = syncStatus + instance.Status.Files[datasourceIndex].Status[fileIndex].Phase = syncStatus if syncStatus == FileProcessPhaseFailed { - instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].ErrMessage = errMsg + instance.Status.Files[datasourceIndex].Status[fileIndex].ErrMessage = errMsg } if syncStatus == FileProcessPhaseSucceeded { - instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].LastUpdateTime = v1.Now() + instance.Status.Files[datasourceIndex].Status[fileIndex].LastUpdateTime = v1.Now() } return nil } diff --git a/api/v1alpha1/versioneddataset_types.go b/api/v1alpha1/versioneddataset_types.go index 3d333fb1b..6f4cb6839 100644 --- a/api/v1alpha1/versioneddataset_types.go +++ b/api/v1alpha1/versioneddataset_types.go @@ -45,12 +45,15 @@ type VersionedDatasetSpec struct { // +kubebuilder:validation:Enum=0;1 // +kubebuilder:default=0 Released uint8 `json:"released"` + + // Which version of the dataset it inherits from + InheritedFrom string `json:"inheritedFrom,omitempty"` } -type DatasourceFileStatus struct { - DatasourceName string `json:"datasourceName"` - DatasourceNamespace string `json:"datasourceNamespace"` - Status []FileDetails `json:"status,omitempty"` +type FileStatus struct { + TypedObjectReference `json:",inline"` + + Status []FileDetails `json:"status,omitempty"` } // VersionedDatasetStatus defines the observed state of VersionedDataset @@ -58,8 +61,8 @@ type VersionedDatasetStatus struct { // ConditionedStatus is the current status ConditionedStatus `json:",inline"` - // DatasourceFiles record the process and results of file processing for each data source - DatasourceFiles []DatasourceFileStatus `json:"datasourceFiles,omitempty"` + // Files record the process and results of file processing for each data source + Files []FileStatus `json:"files,omitempty"` } //+kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 8f105e698..2f7c9fb7b 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -214,28 +214,6 @@ func (in *Datasource) DeepCopyObject() runtime.Object { return nil } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *DatasourceFileStatus) DeepCopyInto(out *DatasourceFileStatus) { - *out = *in - if in.Status != nil { - in, out := &in.Status, &out.Status - *out = make([]FileDetails, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatasourceFileStatus. -func (in *DatasourceFileStatus) DeepCopy() *DatasourceFileStatus { - if in == nil { - return nil - } - out := new(DatasourceFileStatus) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DatasourceList) DeepCopyInto(out *DatasourceList) { *out = *in @@ -494,6 +472,29 @@ func (in *FileGroupDetail) DeepCopy() *FileGroupDetail { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FileStatus) DeepCopyInto(out *FileStatus) { + *out = *in + in.TypedObjectReference.DeepCopyInto(&out.TypedObjectReference) + if in.Status != nil { + in, out := &in.Status, &out.Status + *out = make([]FileDetails, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FileStatus. +func (in *FileStatus) DeepCopy() *FileStatus { + if in == nil { + return nil + } + out := new(FileStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KnowledgeBase) DeepCopyInto(out *KnowledgeBase) { *out = *in @@ -1215,9 +1216,9 @@ func (in *VersionedDatasetSpec) DeepCopy() *VersionedDatasetSpec { func (in *VersionedDatasetStatus) DeepCopyInto(out *VersionedDatasetStatus) { *out = *in in.ConditionedStatus.DeepCopyInto(&out.ConditionedStatus) - if in.DatasourceFiles != nil { - in, out := &in.DatasourceFiles, &out.DatasourceFiles - *out = make([]DatasourceFileStatus, len(*in)) + if in.Files != nil { + in, out := &in.Files, &out.Files + *out = make([]FileStatus, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } diff --git a/config/crd/bases/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml b/config/crd/bases/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml index 0386dcd02..cddb730b1 100644 --- a/config/crd/bases/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml +++ b/config/crd/bases/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml @@ -100,6 +100,9 @@ spec: type: object type: object type: array + inheritedFrom: + description: Which version of the dataset it inherits from + type: string released: default: 0 enum: @@ -155,14 +158,25 @@ spec: - type type: object type: array - datasourceFiles: - description: DatasourceFiles record the process and results of file - processing for each data source + files: + description: Files record the process and results of file processing + for each data source items: properties: - datasourceName: + apiGroup: + description: APIGroup is the group for the resource being referenced. + If APIGroup is not specified, the specified Kind must be in + the core API group. For any other third-party types, APIGroup + is required. + type: string + kind: + description: Kind is the type of resource being referenced + type: string + name: + description: Name is the name of resource being referenced type: string - datasourceNamespace: + namespace: + description: Namespace is the namespace of resource being referenced type: string status: items: @@ -187,8 +201,8 @@ spec: type: object type: array required: - - datasourceName - - datasourceNamespace + - kind + - name type: object type: array type: object diff --git a/controllers/dataset_controller.go b/controllers/dataset_controller.go index d65976dd9..f1ab93b04 100644 --- a/controllers/dataset_controller.go +++ b/controllers/dataset_controller.go @@ -62,11 +62,6 @@ func (r *DatasetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return reconcile.Result{}, err } if instance.DeletionTimestamp != nil { - if err := r.Client.DeleteAllOf(ctx, &arcadiav1alpha1.VersionedDataset{}, client.InNamespace(instance.Namespace), client.MatchingLabels{ - arcadiav1alpha1.LabelVersionedDatasetVersionOwner: instance.Name, - }); err != nil { - return reconcile.Result{}, err - } instance.Finalizers = utils.RemoveString(instance.Finalizers, arcadiav1alpha1.Finalizer) err = r.Client.Update(ctx, instance) return reconcile.Result{}, err diff --git a/controllers/versioneddataset_controller.go b/controllers/versioneddataset_controller.go index e4b8f8c20..4e2ebfa7b 100644 --- a/controllers/versioneddataset_controller.go +++ b/controllers/versioneddataset_controller.go @@ -32,6 +32,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/kubeagi/arcadia/api/v1alpha1" + "github.com/kubeagi/arcadia/pkg/config" + "github.com/kubeagi/arcadia/pkg/datasource" "github.com/kubeagi/arcadia/pkg/scheduler" "github.com/kubeagi/arcadia/pkg/utils" ) @@ -70,22 +72,56 @@ func (r *VersionedDatasetReconciler) Reconcile(ctx context.Context, req ctrl.Req klog.Errorf("reconcile: failed to get versionDataset with req: %v", req.NamespacedName) return reconcile.Result{}, err } - updatedObj, err := r.preUpdate(ctx, instance) + if instance.DeletionTimestamp == nil { + updatedObj, err := r.preUpdate(ctx, instance) + if err != nil { + return reconcile.Result{}, err + } + + if updatedObj { + return reconcile.Result{}, r.Client.Update(ctx, instance) + } + } + + deepCopy := instance.DeepCopy() + update, deleteFilestatus, err := r.checkStatus(ctx, deepCopy) if err != nil { return reconcile.Result{}, err } - if updatedObj { - return reconcile.Result{Requeue: true}, r.Client.Update(ctx, instance) + if update || len(deleteFilestatus) > 0 { + if len(deleteFilestatus) > 0 { + klog.V(4).Infof("[Debug] need to delete files%+v\n", deleteFilestatus) + s, err := scheduler.NewScheduler(ctx, r.Client, instance, deleteFilestatus, true) + if err != nil { + return reconcile.Result{}, err + } + klog.V(4).Infof("[Debug] start to delete %d group files for %s/%s", len(deleteFilestatus), instance.Namespace, instance.Name) + if err = s.Start(); err != nil { + klog.Errorf("try to delete files failed, error %s, need retry", err) + return reconcile.Result{}, err + } + } + if instance.DeletionTimestamp == nil { + err := r.Client.Status().Patch(ctx, deepCopy, client.MergeFrom(instance)) + if err != nil { + klog.Errorf("patch %s/%s status error %s", instance.Namespace, instance.Name, err) + } + return reconcile.Result{}, err + } + return reconcile.Result{}, nil } + klog.V(4).Infof("[Debug] start to add new files") + key := fmt.Sprintf("%s/%s", instance.Namespace, instance.Name) v, ok := r.cache.Load(key) if ok { v.(*scheduler.Scheduler).Stop() } - s, err := scheduler.NewScheduler(ctx, r.Client, instance) + s, err := scheduler.NewScheduler(ctx, r.Client, instance, nil, false) if err != nil { + klog.V(4).Infof("generate scheduler error %s", err) return reconcile.Result{}, err } r.cache.Store(key, s) @@ -154,3 +190,25 @@ func (r *VersionedDatasetReconciler) preUpdate(ctx context.Context, instance *v1 return update, err } + +func (r *VersionedDatasetReconciler) checkStatus(ctx context.Context, instance *v1alpha1.VersionedDataset) (bool, []v1alpha1.FileStatus, error) { + // TODO: Currently, we think there is only one default minio environment, + // so we get the minio client directly through the configuration. + systemDatasource, err := config.GetSystemDatasource(ctx, r.Client) + if err != nil { + klog.Errorf("get system datasource error %s", err) + return false, nil, err + } + endpoint := systemDatasource.Spec.Enpoint.DeepCopy() + if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { + endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace) + } + oss, err := datasource.NewOSS(ctx, r.Client, endpoint) + if err != nil { + klog.Errorf("generate new minio client error %s", err) + return false, nil, err + } + + update, deleteFileStatus := v1alpha1.CopyedFileGroup2Status(oss.Client, instance) + return update, deleteFileStatus, nil +} diff --git a/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml b/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml index 0386dcd02..cddb730b1 100644 --- a/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml +++ b/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml @@ -100,6 +100,9 @@ spec: type: object type: object type: array + inheritedFrom: + description: Which version of the dataset it inherits from + type: string released: default: 0 enum: @@ -155,14 +158,25 @@ spec: - type type: object type: array - datasourceFiles: - description: DatasourceFiles record the process and results of file - processing for each data source + files: + description: Files record the process and results of file processing + for each data source items: properties: - datasourceName: + apiGroup: + description: APIGroup is the group for the resource being referenced. + If APIGroup is not specified, the specified Kind must be in + the core API group. For any other third-party types, APIGroup + is required. + type: string + kind: + description: Kind is the type of resource being referenced + type: string + name: + description: Name is the name of resource being referenced type: string - datasourceNamespace: + namespace: + description: Namespace is the namespace of resource being referenced type: string status: items: @@ -187,8 +201,8 @@ spec: type: object type: array required: - - datasourceName - - datasourceNamespace + - kind + - name type: object type: array type: object diff --git a/pkg/scheduler/executor.go b/pkg/scheduler/executor.go index 221901ec9..3b5976ebe 100644 --- a/pkg/scheduler/executor.go +++ b/pkg/scheduler/executor.go @@ -27,15 +27,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kubeagi/arcadia/api/v1alpha1" - "github.com/kubeagi/arcadia/pkg/config" - "github.com/kubeagi/arcadia/pkg/datasource" ) type executor struct { - instance *v1alpha1.VersionedDataset - client client.Client + instance *v1alpha1.VersionedDataset + client client.Client + minioClient *minio.Client - deleteFileGroups []v1alpha1.DatasourceFileStatus + fileStatus []v1alpha1.FileStatus + remove bool } const ( @@ -43,75 +43,58 @@ const ( bufSize = 5 ) -func newExecutor(ctx context.Context, c client.Client, instance *v1alpha1.VersionedDataset) butcher.Executor[JobPayload] { - _, deleteFileGroups := v1alpha1.CopyedFileGroup2Status(instance) - klog.V(4).Infof("[Debug] status is: %+v\ndelete filegroups: %+v\n", instance.Status.DatasourceFiles, deleteFileGroups) +func newExecutor(ctx context.Context, c client.Client, minioClient *minio.Client, instance *v1alpha1.VersionedDataset, fileStatus []v1alpha1.FileStatus, remove bool) butcher.Executor[JobPayload] { klog.V(4).Infof("[Debug] client is nil: %v\n", c == nil) - return &executor{instance: instance, deleteFileGroups: deleteFileGroups, client: c} + return &executor{instance: instance, fileStatus: fileStatus, client: c, minioClient: minioClient, remove: remove} } -func (e *executor) generateJob(ctx context.Context, jobCh chan<- JobPayload, datasourceFiles []v1alpha1.DatasourceFileStatus, removeAction bool) error { +func (e *executor) generateJob(ctx context.Context, jobCh chan<- JobPayload, datasourceFiles []v1alpha1.FileStatus, removeAction bool) error { for _, fs := range datasourceFiles { select { case <-ctx.Done(): return nil default: } - - ds := &v1alpha1.Datasource{} - datasourceNamespace := fs.DatasourceNamespace - datasetNamespace := e.instance.Namespace - if e.instance.Spec.Dataset.Namespace != nil { - datasetNamespace = *e.instance.Spec.Dataset.Namespace - } - if err := e.client.Get(ctx, types.NamespacedName{Namespace: fs.DatasourceNamespace, Name: fs.DatasourceName}, ds); err != nil { - klog.Errorf("generateJob: failed to get datasource %s", err) - return err - } - - srcBucket := datasourceNamespace - if ds.Spec.OSS != nil { - srcBucket = ds.Spec.OSS.Bucket - } - dstBucket := datasetNamespace - klog.V(4).Infof("[Debug] datasourceNamespace: %s, datasetNamespace: %s, srcBucket: %s, dstBucket: %s", - datasourceNamespace, datasetNamespace, srcBucket, dstBucket) - - klog.V(5).Infof("[Debug] get datasource %+v\n", *ds) - - endpoint := ds.Spec.Enpoint - if ds.Spec.Type() == v1alpha1.DatasourceTypeLocal { - system, err := config.GetSystemDatasource(ctx, e.client) - if err != nil { + dstBucket := e.instance.Namespace + dstPrefix := fmt.Sprintf("dataset/%s/%s/", e.instance.Spec.Dataset.Name, e.instance.Spec.Version) + + var srcBucket, srcPrefix string + switch fs.Kind { + case "Datasource": + // Since the data source can be configured with different minio addresses, + // it may involve copying of data from different minio, + // which may result in the operator memory increasing to be OOM. + // so currently it is considered that all operations are in the same minio. + ds := &v1alpha1.Datasource{} + if err := e.client.Get(ctx, types.NamespacedName{Namespace: *fs.Namespace, Name: fs.Name}, ds); err != nil { + klog.Errorf("generateJob: failed to get datasource %s", err) return err } - endpoint = system.Spec.Enpoint.DeepCopy() - if endpoint != nil && endpoint.AuthSecret != nil { - endpoint.AuthSecret.WithNameSpace(system.Namespace) + srcBucket = *fs.Namespace + if ds.Spec.OSS != nil { + srcBucket = ds.Spec.OSS.Bucket } + case "VersionedDataset": + srcVersion := fs.Name[len(v1alpha1.InheritedFromVersionName):] + srcBucket = e.instance.Namespace + srcPrefix = fmt.Sprintf("dataset/%s/%s/", e.instance.Spec.Dataset.Name, srcVersion) + default: + klog.Errorf("currently, copying data from a data source of the type %s is not supported", fs.Kind) + continue } - oss, err := datasource.NewOSS(ctx, e.client, endpoint) - - if err != nil { - klog.Errorf("generateJob: get oss client error %s", err) - return err - } - klog.V(4).Infof("[Debug] oss client is nil: %v", oss.Client == nil) - bucketExists, err := oss.Client.BucketExists(ctx, dstBucket) + bucketExists, err := e.minioClient.BucketExists(ctx, dstBucket) if err != nil { klog.Errorf("generateJob: check for the presence of a bucket has failed %s.", err) return err } if !bucketExists { - if err = oss.Client.MakeBucket(ctx, dstBucket, minio.MakeBucketOptions{}); err != nil { + if err = e.minioClient.MakeBucket(ctx, dstBucket, minio.MakeBucketOptions{}); err != nil { klog.Errorf("generateJob: failed to create bucket %s.", dstBucket) return err } } - dst := fmt.Sprintf("dataset/%s/%s", e.instance.Spec.Dataset.Name, e.instance.Spec.Version) - for _, fp := range fs.Status { select { case <-ctx.Done(): @@ -125,23 +108,20 @@ func (e *executor) generateJob(ctx context.Context, jobCh chan<- JobPayload, dat } if strings.HasSuffix(fp.Path, "/") { - klog.Warningf("skip %s/%s, because it ends with /. this is not a legal object in oss.", fs.DatasourceName, fp.Path) + klog.Warningf("skip %s/%s, because it ends with /. this is not a legal object in oss.", fs.Name, fp.Path) continue } - dstPath := fp.Path - if !strings.HasPrefix(fp.Path, "/") { - dstPath = "/" + fp.Path - } + targetPath := strings.TrimPrefix(fp.Path, "/") payload := JobPayload{ - Src: fp.Path, - Dst: dst + dstPath, - DatasourceName: fs.DatasourceName, - SrcBucket: srcBucket, - DstBucket: dstBucket, - Client: oss.Client, - Remove: removeAction, + Src: srcPrefix + targetPath, + Dst: dstPrefix + targetPath, + SourceName: fs.Name, + SrcBucket: srcBucket, + DstBucket: dstBucket, + Client: e.minioClient, + Remove: removeAction, } klog.V(4).Infof("[Debug] send %+v to jobch", payload) @@ -150,12 +130,12 @@ func (e *executor) generateJob(ctx context.Context, jobCh chan<- JobPayload, dat } return nil } + func (e *executor) GenerateJob(ctx context.Context, jobCh chan<- JobPayload) error { - if err := e.generateJob(ctx, jobCh, e.instance.Status.DatasourceFiles, false); err != nil { - klog.Errorf("GenerateJob: error %s", err) - return err + if e.remove { + return e.generateJob(ctx, jobCh, e.fileStatus, true) } - return e.generateJob(ctx, jobCh, e.deleteFileGroups, true) + return e.generateJob(ctx, jobCh, e.instance.Status.Files, false) } func (e *executor) Task(ctx context.Context, job JobPayload) error { @@ -190,8 +170,14 @@ func (e *executor) OnFinish(ctx context.Context, job JobPayload, err error) { syncStatus = v1alpha1.FileProcessPhaseFailed errMsg = err.Error() } - klog.V(4).Infof("[Debug] change the status of file %s/%s to %s", job.DatasourceName, job.Src, syncStatus) - if err = v1alpha1.UpdateFileStatus(ctx, e.instance, job.DatasourceName, job.Src, syncStatus, errMsg); err != nil { + src := job.Src + if strings.HasPrefix(job.SourceName, v1alpha1.InheritedFromVersionName) { + p := fmt.Sprintf("dataset/%s/%s/", e.instance.Spec.Dataset.Name, e.instance.Spec.InheritedFrom) + src = strings.TrimPrefix(src, p) + } + klog.V(4).Infof("[Debug] change the status of file %s/%s to %s", job.SourceName, src, syncStatus) + + if err = v1alpha1.UpdateFileStatus(ctx, e.instance, job.SourceName, src, syncStatus, errMsg); err != nil { klog.Errorf("the job with payload %v completes, but updating the cr status fails %s.", job, err) } } diff --git a/pkg/scheduler/job.go b/pkg/scheduler/job.go index 6844b9af0..17ff43e42 100644 --- a/pkg/scheduler/job.go +++ b/pkg/scheduler/job.go @@ -19,7 +19,7 @@ import "github.com/minio/minio-go/v7" type JobPayload struct { Src, Dst string - DatasourceName string + SourceName string SrcBucket, DstBucket string Client *minio.Client diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 928be7aec..c53b64621 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -27,6 +27,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kubeagi/arcadia/api/v1alpha1" + "github.com/kubeagi/arcadia/pkg/config" + "github.com/kubeagi/arcadia/pkg/datasource" ) type Scheduler struct { @@ -35,17 +37,39 @@ type Scheduler struct { runner butcher.Butcher client client.Client - ds *v1alpha1.VersionedDataset + ds *v1alpha1.VersionedDataset + remove bool } -func NewScheduler(ctx context.Context, c client.Client, instance *v1alpha1.VersionedDataset) (*Scheduler, error) { +func NewScheduler(ctx context.Context, c client.Client, instance *v1alpha1.VersionedDataset, fileStatus []v1alpha1.FileStatus, remove bool) (*Scheduler, error) { if ctx == nil { ctx = context.Background() } ctx1, cancel := context.WithCancel(ctx) - s := &Scheduler{ctx: ctx1, cancel: cancel, ds: instance, client: c} - exectuor, err := butcher.NewButcher[JobPayload](newExecutor(ctx1, c, instance), butcher.BufferSize(bufSize), butcher.MaxWorker(maxWorkers)) + + // TODO: Currently, we think there is only one default minio environment, + // so we get the minio client directly through the configuration. + systemDatasource, err := config.GetSystemDatasource(ctx1, c) + if err != nil { + klog.Errorf("generate new scheduler error %s", err) + cancel() + return nil, err + } + endpoint := systemDatasource.Spec.Enpoint.DeepCopy() + if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { + endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace) + } + oss, err := datasource.NewOSS(ctx1, c, endpoint) if err != nil { + cancel() + klog.Errorf("generate new minio client error %s", err) + return nil, err + } + + s := &Scheduler{ctx: ctx1, cancel: cancel, ds: instance, client: c, remove: remove} + exectuor, err := butcher.NewButcher[JobPayload](newExecutor(ctx1, c, oss.Client, instance, fileStatus, remove), butcher.BufferSize(bufSize), butcher.MaxWorker(maxWorkers)) + if err != nil { + cancel() return nil, err } s.runner = exectuor @@ -70,6 +94,9 @@ func (s *Scheduler) Start() error { klog.Infof("versionDataset %s/%s is being deleted, so we need to update his finalizers to allow the deletion to complete smoothly", ds.Namespace, ds.Name) return s.client.Update(s.ctx, ds) } + if s.remove { + return nil + } if ds.ResourceVersion == s.ds.ResourceVersion { syncCond := true @@ -79,7 +106,7 @@ func (s *Scheduler) Start() error { } } deepCopy := ds.DeepCopy() - deepCopy.Status.DatasourceFiles = s.ds.Status.DatasourceFiles + deepCopy.Status.Files = s.ds.Status.Files if syncCond { condition := v1alpha1.Condition{ Type: v1alpha1.TypeReady, @@ -88,7 +115,7 @@ func (s *Scheduler) Start() error { Reason: v1alpha1.ReasonFileSuncSuccess, Message: "", } - for _, checker := range s.ds.Status.DatasourceFiles { + for _, checker := range s.ds.Status.Files { shouldBreak := false for _, f := range checker.Status { if f.Phase != v1alpha1.FileProcessPhaseSucceeded { @@ -109,6 +136,7 @@ func (s *Scheduler) Start() error { } klog.Infof("current resourceversion: %s, previous resourceversion: %s", ds.ResourceVersion, s.ds.ResourceVersion) + s.Stop() return nil } diff --git a/pkg/utils/minioutils/minioutils.go b/pkg/utils/minioutils/minioutils.go new file mode 100644 index 000000000..0d1ce6d0f --- /dev/null +++ b/pkg/utils/minioutils/minioutils.go @@ -0,0 +1,46 @@ +/* +Copyright 2023 KubeAGI. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minioutils + +import ( + "context" + "strings" + + "github.com/minio/minio-go/v7" +) + +func ListObjects(ctx context.Context, bucket, prefix string, client *minio.Client, maxDep int) []string { + result := make([]string, 0) + q := []string{prefix} + depth := 0 + + for len(q) > 0 && (maxDep <= 0 || depth < maxDep) { + nq := make([]string, 0) + for _, p := range q { + for key := range client.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: p}) { + if strings.HasSuffix(key.Key, "/") { + nq = append(nq, key.Key) + continue + } + result = append(result, key.Key) + } + } + q = nq + depth++ + } + return result +}