Skip to content

Commit

Permalink
Merge pull request #245 from 0xff-dev/main
Browse files Browse the repository at this point in the history
fix: refactor dataset and versioneddataset
  • Loading branch information
bjwswang authored Nov 21, 2023
2 parents 11cf14a + e13a2ca commit 8d40185
Show file tree
Hide file tree
Showing 11 changed files with 375 additions and 165 deletions.
137 changes: 101 additions & 36 deletions api/v1alpha1/versioneddataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,52 @@ import (
"context"
"fmt"
"sort"
"strings"

"github.com/minio/minio-go/v7"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

"github.com/kubeagi/arcadia/pkg/utils/minioutils"
)

var (
LabelVersionedDatasetVersion = Group + "/version"
LabelVersionedDatasetVersionOwner = Group + "/owner"
)

// CopyedFileGroup2Status the function will eventually return, whether there are new files added. and a list of files that were deleted.
func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileStatus) {
if instance.DeletionTimestamp != nil {
source := instance.Status.DatasourceFiles
instance.Status.DatasourceFiles = nil
return true, source
const InheritedFromVersionName = "inheritfrom-"

func generateInheriedFileStatus(minioClient *minio.Client, instance *VersionedDataset) []FileStatus {
if instance.Spec.InheritedFrom == "" {
return nil
}

// 1. First store the information about the status of the file that has been saved in the current status.
oldDatasourceFiles := make(map[string]map[string]FileDetails)
for _, fileStatus := range instance.Status.DatasourceFiles {
key := fmt.Sprintf("%s %s", fileStatus.DatasourceNamespace, fileStatus.DatasourceName)
if _, ok := oldDatasourceFiles[key]; !ok {
oldDatasourceFiles[key] = make(map[string]FileDetails)
}
for _, item := range fileStatus.Status {
oldDatasourceFiles[key][item.Path] = item
srcBucket := instance.Spec.Dataset.Namespace
prefix := fmt.Sprintf("dataset/%s/%s/", instance.Spec.Dataset.Name, instance.Spec.InheritedFrom)
filePaths := minioutils.ListObjects(context.TODO(), *srcBucket, prefix, minioClient, -1)
status := make([]FileDetails, len(filePaths))
sort.Strings(filePaths)

for idx, fp := range filePaths {
status[idx] = FileDetails{
Path: strings.TrimPrefix(fp, prefix),
Phase: FileProcessPhaseProcessing,
}
}
return []FileStatus{
{
TypedObjectReference: TypedObjectReference{
Name: InheritedFromVersionName + instance.Spec.InheritedFrom,
Namespace: &instance.Namespace,
Kind: "VersionedDataset",
},
Status: status,
}}
}

func generateDatasourceFileStatus(instance *VersionedDataset) []FileStatus {
// 2. Organize the contents of the fileGroup into this format: {"datasourceNamespace datasourceName": ["file1", "file2"]}
fileGroup := make(map[string][]string)
for _, fg := range instance.Spec.FileGroups {
Expand All @@ -65,14 +81,17 @@ func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileS
}

// 3. Convert fileGroup to []DatasourceFileStatus format
targetDatasourceFileStatus := make([]DatasourceFileStatus, 0)
targetDatasourceFileStatus := make([]FileStatus, 0)
var namespace, name string
for datasource, filePaths := range fileGroup {
_, _ = fmt.Sscanf(datasource, "%s %s", &namespace, &name)
item := DatasourceFileStatus{
DatasourceName: name,
DatasourceNamespace: namespace,
Status: []FileDetails{},
item := FileStatus{
TypedObjectReference: TypedObjectReference{
Name: name,
Namespace: &namespace,
Kind: "Datasource",
},
Status: []FileDetails{},
}
for _, fp := range filePaths {
item.Status = append(item.Status, FileDetails{
Expand All @@ -86,15 +105,40 @@ func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileS

targetDatasourceFileStatus = append(targetDatasourceFileStatus, item)
}
return targetDatasourceFileStatus
}

// CopyedFileGroup2Status the function will eventually return, whether there are new files added. and a list of files that were deleted.
func CopyedFileGroup2Status(minioClient *minio.Client, instance *VersionedDataset) (bool, []FileStatus) {
if instance.DeletionTimestamp != nil {
source := instance.Status.Files
instance.Status.Files = nil
return true, source
}

// 1. First store the information about the status of the file that has been saved in the current status.
oldDatasourceFiles := make(map[string]map[string]FileDetails)
for _, fileStatus := range instance.Status.Files {
key := fmt.Sprintf("%s %s", *fileStatus.Namespace, fileStatus.Name)
if _, ok := oldDatasourceFiles[key]; !ok {
oldDatasourceFiles[key] = make(map[string]FileDetails)
}
for _, item := range fileStatus.Status {
oldDatasourceFiles[key][item.Path] = item
}
}

targetDatasourceFileStatus := generateDatasourceFileStatus(instance)
targetDatasourceFileStatus = append(targetDatasourceFileStatus, generateInheriedFileStatus(minioClient, instance)...)

// 4. If a file from a data source is found to exist in oldDatasourceFiles,
// replace it with the book inside oldDatasourceFiles.
// Otherwise set the file as being processed.
update := false
deletedFiles := make([]DatasourceFileStatus, 0)
deletedFiles := make([]FileStatus, 0)
for idx := range targetDatasourceFileStatus {
item := targetDatasourceFileStatus[idx]
key := fmt.Sprintf("%s %s", item.DatasourceNamespace, item.DatasourceName)
key := fmt.Sprintf("%s %s", *item.Namespace, item.Name)

// if the datasource itself is not in status, then it is a new series of files added.
datasourceFiles, ok := oldDatasourceFiles[key]
Expand All @@ -116,21 +160,40 @@ func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileS
delete(datasourceFiles, status.Path)
}
if len(datasourceFiles) > 0 {
ds := DatasourceFileStatus{
DatasourceName: item.DatasourceName,
DatasourceNamespace: item.DatasourceNamespace,
Status: make([]FileDetails, 0),
ds := FileStatus{
TypedObjectReference: TypedObjectReference{
Name: item.Name,
Namespace: item.Namespace,
},
Status: make([]FileDetails, 0),
}
for _, r := range datasourceFiles {
ds.Status = append(ds.Status, r)
}
deletedFiles = append(deletedFiles, ds)
}
targetDatasourceFileStatus[idx] = item
delete(oldDatasourceFiles, key)
}

for key, item := range oldDatasourceFiles {
var namespace, name string
fmt.Sscanf(key, "%s %s", &namespace, &name)
ds := FileStatus{
TypedObjectReference: TypedObjectReference{
Name: name,
Namespace: &namespace,
},
Status: make([]FileDetails, 0),
}
for _, r := range item {
ds.Status = append(ds.Status, r)
}
deletedFiles = append(deletedFiles, ds)
}

sort.Slice(targetDatasourceFileStatus, func(i, j int) bool {
return targetDatasourceFileStatus[i].DatasourceName < targetDatasourceFileStatus[j].DatasourceName
return targetDatasourceFileStatus[i].Name < targetDatasourceFileStatus[j].Name
})

index := -1
Expand All @@ -156,37 +219,39 @@ func CopyedFileGroup2Status(instance *VersionedDataset) (bool, []DatasourceFileS
}
instance.Status.ConditionedStatus.SetConditions(cond)
}
instance.Status.DatasourceFiles = targetDatasourceFileStatus
klog.V(4).Infof("[Debug] delete filestatus %+v\n", deletedFiles)

instance.Status.Files = targetDatasourceFileStatus
// update condition to sync
return update, deletedFiles
}

func UpdateFileStatus(ctx context.Context, instance *VersionedDataset, datasource, srcPath string, syncStatus FileProcessPhase, errMsg string) error {
datasourceFileLen := len(instance.Status.DatasourceFiles)
datasourceFileLen := len(instance.Status.Files)
datasourceIndex := sort.Search(datasourceFileLen, func(i int) bool {
return instance.Status.DatasourceFiles[i].DatasourceName >= datasource
return instance.Status.Files[i].Name >= datasource
})
if datasourceIndex == datasourceFileLen {
return fmt.Errorf("not found datasource %s in %s/%s.status", datasource, instance.Namespace, instance.Name)
}

filePathLen := len(instance.Status.DatasourceFiles[datasourceIndex].Status)
filePathLen := len(instance.Status.Files[datasourceIndex].Status)
fileIndex := sort.Search(filePathLen, func(i int) bool {
return instance.Status.DatasourceFiles[datasourceIndex].Status[i].Path >= srcPath
return instance.Status.Files[datasourceIndex].Status[i].Path >= srcPath
})
if fileIndex == filePathLen {
return fmt.Errorf("not found srcPath %s in datasource %s", srcPath, datasource)
}

// Only this state transfer is allowed
curPhase := instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].Phase
curPhase := instance.Status.Files[datasourceIndex].Status[fileIndex].Phase
if curPhase == FileProcessPhaseProcessing && (syncStatus == FileProcessPhaseSucceeded || syncStatus == FileProcessPhaseFailed) {
instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].Phase = syncStatus
instance.Status.Files[datasourceIndex].Status[fileIndex].Phase = syncStatus
if syncStatus == FileProcessPhaseFailed {
instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].ErrMessage = errMsg
instance.Status.Files[datasourceIndex].Status[fileIndex].ErrMessage = errMsg
}
if syncStatus == FileProcessPhaseSucceeded {
instance.Status.DatasourceFiles[datasourceIndex].Status[fileIndex].LastUpdateTime = v1.Now()
instance.Status.Files[datasourceIndex].Status[fileIndex].LastUpdateTime = v1.Now()
}
return nil
}
Expand Down
15 changes: 9 additions & 6 deletions api/v1alpha1/versioneddataset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,24 @@ type VersionedDatasetSpec struct {
// +kubebuilder:validation:Enum=0;1
// +kubebuilder:default=0
Released uint8 `json:"released"`

// Which version of the dataset it inherits from
InheritedFrom string `json:"inheritedFrom,omitempty"`
}

type DatasourceFileStatus struct {
DatasourceName string `json:"datasourceName"`
DatasourceNamespace string `json:"datasourceNamespace"`
Status []FileDetails `json:"status,omitempty"`
type FileStatus struct {
TypedObjectReference `json:",inline"`

Status []FileDetails `json:"status,omitempty"`
}

// VersionedDatasetStatus defines the observed state of VersionedDataset
type VersionedDatasetStatus struct {
// ConditionedStatus is the current status
ConditionedStatus `json:",inline"`

// DatasourceFiles record the process and results of file processing for each data source
DatasourceFiles []DatasourceFileStatus `json:"datasourceFiles,omitempty"`
// Files record the process and results of file processing for each data source
Files []FileStatus `json:"files,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
51 changes: 26 additions & 25 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 21 additions & 7 deletions config/crd/bases/arcadia.kubeagi.k8s.com.cn_versioneddatasets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ spec:
type: object
type: object
type: array
inheritedFrom:
description: Which version of the dataset it inherits from
type: string
released:
default: 0
enum:
Expand Down Expand Up @@ -155,14 +158,25 @@ spec:
- type
type: object
type: array
datasourceFiles:
description: DatasourceFiles record the process and results of file
processing for each data source
files:
description: Files record the process and results of file processing
for each data source
items:
properties:
datasourceName:
apiGroup:
description: APIGroup is the group for the resource being referenced.
If APIGroup is not specified, the specified Kind must be in
the core API group. For any other third-party types, APIGroup
is required.
type: string
kind:
description: Kind is the type of resource being referenced
type: string
name:
description: Name is the name of resource being referenced
type: string
datasourceNamespace:
namespace:
description: Namespace is the namespace of resource being referenced
type: string
status:
items:
Expand All @@ -187,8 +201,8 @@ spec:
type: object
type: array
required:
- datasourceName
- datasourceNamespace
- kind
- name
type: object
type: array
type: object
Expand Down
5 changes: 0 additions & 5 deletions controllers/dataset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ func (r *DatasetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return reconcile.Result{}, err
}
if instance.DeletionTimestamp != nil {
if err := r.Client.DeleteAllOf(ctx, &arcadiav1alpha1.VersionedDataset{}, client.InNamespace(instance.Namespace), client.MatchingLabels{
arcadiav1alpha1.LabelVersionedDatasetVersionOwner: instance.Name,
}); err != nil {
return reconcile.Result{}, err
}
instance.Finalizers = utils.RemoveString(instance.Finalizers, arcadiav1alpha1.Finalizer)
err = r.Client.Update(ctx, instance)
return reconcile.Result{}, err
Expand Down
Loading

0 comments on commit 8d40185

Please sign in to comment.