diff --git a/go.mod b/go.mod index d9816fd39..d2d617036 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.18.1 github.com/r3labs/sse/v2 v2.10.0 + github.com/satori/go.uuid v1.2.0 github.com/spf13/cobra v1.4.0 github.com/stretchr/testify v1.8.4 github.com/tmc/langchaingo v0.0.0-20231017212009-949349d5ef9c diff --git a/go.sum b/go.sum index 150815a89..a865162bf 100644 --- a/go.sum +++ b/go.sum @@ -512,6 +512,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= diff --git a/graphql-server/go-server/main.go b/graphql-server/go-server/main.go index 0ba43993c..79e1de554 100644 --- a/graphql-server/go-server/main.go +++ b/graphql-server/go-server/main.go @@ -32,6 +32,7 @@ import ( "github.com/kubeagi/arcadia/graphql-server/go-server/graph/generated" "github.com/kubeagi/arcadia/graphql-server/go-server/graph/impl" "github.com/kubeagi/arcadia/graphql-server/go-server/pkg/auth" + "github.com/kubeagi/arcadia/graphql-server/go-server/pkg/minio" "github.com/kubeagi/arcadia/graphql-server/go-server/pkg/oidc" _ "k8s.io/client-go/plugin/pkg/client/auth/oidc" @@ -85,6 +86,12 @@ func main() { http.Handle("/bff", srv) } + http.HandleFunc("/minio/get_chunks", minio.GetSuccessChunks) + http.HandleFunc("/minio/new_multipart", minio.NewMultipart) + http.HandleFunc("/minio/get_multipart_url", minio.GetMultipartUploadURL) + http.HandleFunc("/minio/complete_multipart", minio.CompleteMultipart) + http.HandleFunc("/minio/update_chunk", minio.UpdateMultipart) + klog.Infof("listening server on port: %d", *port) log.Fatal(http.ListenAndServe(fmt.Sprintf("%s:%d", *host, *port), nil)) } diff --git a/graphql-server/go-server/pkg/minio/client.go b/graphql-server/go-server/pkg/minio/client.go new file mode 100644 index 000000000..58dd4871f --- /dev/null +++ b/graphql-server/go-server/pkg/minio/client.go @@ -0,0 +1,120 @@ +/* +Copyright 2023 KubeAGI. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minio + +import ( + "context" + "os" + "sync" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "k8s.io/klog/v2" + + client3 "github.com/kubeagi/arcadia/graphql-server/go-server/pkg/client" + "github.com/kubeagi/arcadia/pkg/config" + "github.com/kubeagi/arcadia/pkg/utils" +) + +var minioClient *minio.Client = nil +var coreClient *minio.Core = nil + +var mutex *sync.Mutex + +var ( + minioAddress string + minioAccessKeyID string + minioSecretAccessKey string + minioSecure bool + minioBucket string + minioBasePath string +) + +func init() { + mutex = new(sync.Mutex) + + if err := utils.SetSelfNamespace(); err != nil { + klog.Errorf("unable to get self namespace: %s", err) + os.Exit(1) + } + + c, err := client3.GetClient(nil) + if err != nil { + panic(err) + } + minioConfig, err := config.GetMinIO(context.TODO(), c) + if err != nil { + panic(err) + } + minioAddress = minioConfig.MinioAddress + minioAccessKeyID = minioConfig.MinioAccessKeyID + minioSecretAccessKey = minioConfig.MinioSecretAccessKey + minioBucket = minioConfig.MinioBucket + minioBasePath = minioConfig.MinioBasePath + minioSecure = minioConfig.MinioSecure +} + +func getClients() (*minio.Client, *minio.Core, error) { + var client1 *minio.Client + var client2 *minio.Core + var err error + + mutex.Lock() + + if minioClient != nil && coreClient != nil { + client1 = minioClient + client2 = coreClient + mutex.Unlock() + return client1, client2, nil + } + + aliasedURL := minioAddress + accessKeyID := minioAccessKeyID + secretAccessKey := minioSecretAccessKey + secure := minioSecure + + if minioClient == nil { + minioClient, err = minio.New(aliasedURL, &minio.Options{ + Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), + Secure: secure, + }) + } + if err != nil { + mutex.Unlock() + return nil, nil, err + } + + client1 = minioClient + + if coreClient == nil { + coreClient, err = minio.NewCore(aliasedURL, &minio.Options{ + Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), + Secure: secure, + }) + } + + if err != nil { + mutex.Unlock() + return nil, nil, err + } + + client2 = coreClient + + mutex.Unlock() + + return client1, client2, nil +} diff --git a/graphql-server/go-server/pkg/minio/model/file_chunk.go b/graphql-server/go-server/pkg/minio/model/file_chunk.go new file mode 100644 index 000000000..f96aae0d3 --- /dev/null +++ b/graphql-server/go-server/pkg/minio/model/file_chunk.go @@ -0,0 +1,101 @@ +/* +Copyright 2023 KubeAGI. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package models + +import ( + "errors" +) + +const ( + FileNotUploaded int = iota + FileUploaded +) + +// maxPartsCount - maximum number of parts for a single multipart session. +const MaxPartsCount = 10000 + +// maxMultipartPutObjectSize - maximum size 5TiB of object for +// Multipart operation. +const MaxMultipartPutObjectSize = 1024 * 1024 * 1024 * 1024 * 5 + +// minPartSize - minimum part size 128MiB per object after which +// putObject behaves internally as multipart. +const MinPartSize = 1024 * 1024 * 64 + +type FileChunk struct { + UUID string + Md5 string + IsUploaded int + UploadID string + TotalChunks int + Size int64 + FileName string + CompletedParts string +} + +var fileChunks = make([]*FileChunk, 0, 250) + +func GetFileChunkByMD5(md5 string) (*FileChunk, error) { + fileChunk := new(FileChunk) + matched := false + for _, chunk := range fileChunks { + if chunk.Md5 == md5 { + fileChunk = chunk + matched = true + } + } + if !matched { + return nil, errors.New("GetFileChunksByUUID failed") + } + return fileChunk, nil +} + +func GetFileChunkByUUID(uuid string) (*FileChunk, error) { + fileChunk := new(FileChunk) + matched := false + for _, chunk := range fileChunks { + if chunk.UUID == uuid { + fileChunk = chunk + matched = true + break + } + } + if !matched { + return nil, errors.New("GetFileChunksByUUID failed") + } + return fileChunk, nil +} + +func UpdateFileChunk(fileChunk *FileChunk) error { + updated := false + for _, chunk := range fileChunks { + if chunk.UUID == fileChunk.UUID && chunk.Md5 == fileChunk.Md5 { + chunk.IsUploaded = fileChunk.IsUploaded + chunk.CompletedParts = fileChunk.CompletedParts + updated = true + } + } + if !updated { + return errors.New("UpdateFileChunk failed") + } + return nil +} + +func InsetFileChunk(fileChunk *FileChunk) (_ *FileChunk, err error) { + fileChunks = append(fileChunks, fileChunk) + return fileChunk, nil +} diff --git a/graphql-server/go-server/pkg/minio/service.go b/graphql-server/go-server/pkg/minio/service.go new file mode 100644 index 000000000..be03a44c4 --- /dev/null +++ b/graphql-server/go-server/pkg/minio/service.go @@ -0,0 +1,551 @@ +/* +Copyright 2023 KubeAGI. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minio + +import ( + "context" + "encoding/json" + "encoding/xml" + "log" + "net/http" + "net/url" + "path" + "sort" + "strconv" + "strings" + "time" + + "github.com/minio/minio-go/v7" + gouuid "github.com/satori/go.uuid" + "k8s.io/klog/v2" + + "github.com/kubeagi/arcadia/graphql-server/go-server/pkg/minio/model" +) + +type SuccessChunksResult struct { + ResultCode int `json:"resultCode"` + UUID string `json:"uuid"` + Uploaded string `json:"uploaded"` + UploadID string `json:"uploadID"` + Chunks string `json:"chunks"` +} + +type NewMultipartResult struct { + UUID string `json:"uuid"` + UploadID string `json:"uploadID"` +} + +type MultipartUploadURLResult struct { + URL string `json:"url"` +} + +// completeMultipartUpload container for completing multipart upload. +type CompleteMultipartUpload struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUpload" json:"-"` + Parts []minio.CompletePart `xml:"Part"` +} + +// completedParts is a collection of parts sortable by their part numbers. +// used for sorting the uploaded parts before completing the multipart request. +type completedParts []minio.CompletePart + +func (a completedParts) Len() int { return len(a) } +func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } + +const ( + PresignedUploadPartURLExpireTime = time.Hour * 24 * 7 +) + +func GetSuccessChunks(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + var res = -1 + var uuid, uploaded, uploadID, chunks, fileName string + var partNumberMarker, maxParts int + + query := req.URL.Query() + fileMD5 := query.Get("md5") + if fileMD5 == "" { + klog.Error("GetFileChunkByMD5 failed: md5 is required") + _, err := w.Write([]byte("md5 is required")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + + for { + fileChunk, err := models.GetFileChunkByMD5(fileMD5) + if err != nil { + klog.Infof("GetFileChunkByMD5 failed: %s", err) + break + } + uuid = fileChunk.UUID + uploaded = strconv.Itoa(fileChunk.IsUploaded) + uploadID = fileChunk.UploadID + fileName = fileChunk.FileName + + bucketName := minioBucket + objectName := strings.TrimPrefix(path.Join(minioBasePath, path.Join(uuid[0:5], fileName)), "/") + + isExist, err := isObjectExist(ctx, bucketName, objectName) + if err != nil { + klog.Errorf("isObjectExist failed: %s", err) + break + } + + if isExist { + uploaded = "1" + if fileChunk.IsUploaded != models.FileUploaded { + klog.Info("the file has been uploaded but not recorded") + fileChunk.IsUploaded = 1 + if err = models.UpdateFileChunk(fileChunk); err != nil { + klog.Errorf("UpdateFileChunk failed: %s", err) + } + } + res = 0 + break + } else { + uploaded = "0" + if fileChunk.IsUploaded == models.FileUploaded { + klog.Info("the file has been recorded but not uploaded") + fileChunk.IsUploaded = 0 + if err = models.UpdateFileChunk(fileChunk); err != nil { + klog.Errorf("UpdateFileChunk failed: %s", err) + } + } + } + _, client, err := getClients() + if err != nil { + klog.Errorf("getClients failed: %s", err) + break + } + + // TODO partNumberMarker, maxParts ? + listObjectPartsResult, err := client.ListObjectParts(ctx, bucketName, objectName, uploadID, partNumberMarker, maxParts) + if err != nil { + klog.Errorf("ListObjectParts failed: %s", err) + break + } + for _, objectPart := range listObjectPartsResult.ObjectParts { + chunks += strconv.Itoa(objectPart.PartNumber) + "-" + objectPart.ETag + "," + } + // nolint + break + } + result := SuccessChunksResult{ + ResultCode: res, + Uploaded: uploaded, + UUID: uuid, + UploadID: uploadID, + Chunks: chunks, + } + message, _ := json.Marshal(result) + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + _, err := w.Write(message) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } +} + +func NewMultipart(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + var uuid, uploadID string + query := req.URL.Query() + queryTotalChunkCounts := query.Get("totalChunkCounts") + if queryTotalChunkCounts == "" { + klog.Error("NewMultipart failed: totalChunkCounts is required") + _, err := w.Write([]byte("totalChunkCounts is required")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + totalChunkCounts, err := strconv.Atoi(queryTotalChunkCounts) + if err != nil { + klog.Errorf("NewMultipart failed: %s", err) + _, err := w.Write([]byte("totalChunkCounts is illegal.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + if totalChunkCounts > models.MaxPartsCount || totalChunkCounts <= 0 { + klog.Error("totalChunkCounts is illegal.") + _, err := w.Write([]byte("totalChunkCounts is illegal.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + querySize := query.Get("size") + if querySize == "" { + klog.Error("size is illegal.") + _, err := w.Write([]byte("size is required.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + fileSize, err := strconv.ParseInt(querySize, 10, 64) + if err != nil { + klog.Error("size is illegal.") + _, err := w.Write([]byte("size is illegal.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + if fileSize > models.MaxMultipartPutObjectSize || fileSize < 0 { + klog.Error("size is illegal.") + _, err := w.Write([]byte("size is illegal.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + md5 := query.Get("md5") + if md5 == "" { + klog.Error("md5 is illegal.") + _, err := w.Write([]byte("md5 is required.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + + fileName := query.Get("fileName") + if fileName == "" { + klog.Error("fileName is illegal.") + _, err := w.Write([]byte("fileName is required.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + + uuid = gouuid.NewV4().String() + uploadID, err = newMultiPartUpload(ctx, uuid, fileName) + if err != nil { + klog.Errorf("newMultiPartUpload failed: %s", err) + _, err := w.Write([]byte("newMultiPartUpload failed.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + _, err = models.InsetFileChunk(&models.FileChunk{ + UUID: uuid, + UploadID: uploadID, + Md5: md5, + Size: fileSize, + FileName: fileName, + TotalChunks: totalChunkCounts, + }) + if err != nil { + klog.Errorf("InsetFileChunk failed: %s", err) + _, err := w.Write([]byte("InsetFileChunk failed.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + + result := NewMultipartResult{ + UUID: uuid, + UploadID: uploadID, + } + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + message, _ := json.Marshal(result) + _, err = w.Write(message) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } +} + +func GetMultipartUploadURL(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + var url string + query := req.URL.Query() + uuid := query.Get("uuid") + if uuid == "" { + klog.Error("uuid is required.") + _, err := w.Write([]byte("uuid is required.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + uploadID := query.Get("uploadID") + if uploadID == "" { + klog.Error("uploadID is required.") + _, err := w.Write([]byte("uploadID is required.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + queryChunkNumber := query.Get("chunkNumber") + if queryChunkNumber == "" { + klog.Error("chunkNumber is required.") + _, err := w.Write([]byte("chunkNumber is required.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + partNumber, err := strconv.Atoi(queryChunkNumber) + if err != nil { + klog.Errorf("chunkNumber is illegal: %s", err) + _, err := w.Write([]byte("chunkNumber is illegal.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + querySize := query.Get("size") + if querySize == "" { + klog.Error("size is required.") + _, err := w.Write([]byte("size is required.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + size, err := strconv.ParseInt(querySize, 10, 64) + if err != nil { + klog.Errorf("size is illegal: %s", err) + _, err := w.Write([]byte("size is illegal.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + if size > models.MinPartSize { + klog.Error("size is illegal.") + _, err := w.Write([]byte("size is illegal.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + + fileChunk, err := models.GetFileChunkByUUID(uuid) + if err != nil { + klog.Errorf("GetFileChunkByUUID failed: %s", err) + _, err := w.Write([]byte("GetFileChunkByUUID failed.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + + url, err = genMultiPartSignedURL(ctx, uuid, uploadID, partNumber, fileChunk.FileName, size) + if err != nil { + klog.Errorf("genMultiPartSignedURL failed: %s", err) + _, err := w.Write([]byte("genMultiPartSignedURL failed.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + + result := MultipartUploadURLResult{ + URL: url, + } + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + message, _ := json.Marshal(result) + _, err = w.Write(message) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } +} + +func CompleteMultipart(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + err := req.ParseForm() + if err != nil { + klog.Errorf("req.ParseForm failed: %s", err) + } + uuid := req.Form.Get("uuid") + uploadID := req.Form.Get("uploadID") + fileChunk, err := models.GetFileChunkByUUID(uuid) + if err != nil { + klog.Errorf("GetFileChunkByUUID failed: %s", err) + _, err := w.Write([]byte("GetFileChunkByUUID failed.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + _, err = completeMultiPartUpload(ctx, uuid, uploadID, fileChunk.FileName) + if err != nil { + klog.Errorf("completeMultiPartUpload failed: %s", err) + _, err := w.Write([]byte("completeMultiPartUpload failed.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + + fileChunk.IsUploaded = models.FileUploaded + + err = models.UpdateFileChunk(fileChunk) + if err != nil { + klog.Errorf("UpdateFileChunk failed: %s", err) + _, err := w.Write([]byte("completeMultiPartUpload failed.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + _, err = w.Write([]byte("success")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } +} + +func UpdateMultipart(w http.ResponseWriter, req *http.Request) { + err := req.ParseForm() + if err != nil { + klog.Errorf("req.ParseForm failed: %s", err) + } + uuid := req.Form.Get("uuid") + etag := req.Form.Get("etag") + chunkNumber := req.Form.Get("chunkNumber") + fileChunk, err := models.GetFileChunkByUUID(uuid) + if err != nil { + log.Println("GetFileChunkByUUID failed") + _, err := w.Write([]byte("GetFileChunkByUUID failed.")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + + fileChunk.CompletedParts += chunkNumber + "-" + strings.ReplaceAll(etag, "\"", "") + "," + + err = models.UpdateFileChunk(fileChunk) + if err != nil { + klog.Errorf("UpdateFileChunk failed: %s", err) + _, err := w.Write([]byte("UpdateFileChunk failed")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } + return + } + _, err = w.Write([]byte("success")) + if err != nil { + klog.Errorf("w.Write failed: %s", err) + } +} + +func isObjectExist(ctx context.Context, bucketName string, objectName string) (bool, error) { + isExist := false + // TODO doneCh? + doneCh := make(chan struct{}) + defer close(doneCh) + + client, _, err := getClients() + if err != nil { + klog.Errorf("getClients failed: %s", err) + return isExist, err + } + + objectCh := client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{Prefix: objectName, Recursive: false}) + object, ok := <-objectCh + if !ok || object.Err != nil { + klog.Errorf("ListObjects failed: %s", object.Err) + return isExist, object.Err + } + isExist = true + return isExist, nil +} + +func newMultiPartUpload(ctx context.Context, uuid string, fileName string) (string, error) { + _, minioClient, err := getClients() + if err != nil { + klog.Errorf("getClient failed: %s", err) + return "", err + } + + bucketName := minioBucket + objectName := strings.TrimPrefix(path.Join(minioBasePath, path.Join(uuid[0:5], fileName)), "/") + + return minioClient.NewMultipartUpload(ctx, bucketName, objectName, minio.PutObjectOptions{}) +} + +func genMultiPartSignedURL(ctx context.Context, uuid string, uploadID string, partNumber int, fileName string, partSize int64) (string, error) { + _, client, err := getClients() + if err != nil { + klog.Errorf("getClient failed: %s", err) + return "", err + } + + bucketName := minioBucket + objectName := strings.TrimPrefix(path.Join(minioBasePath, path.Join(uuid[0:5], fileName)), "/") + u, err := client.Presign(ctx, http.MethodPut, bucketName, objectName, PresignedUploadPartURLExpireTime, url.Values{ + "uploadId": []string{uploadID}, + "partNumber": []string{strconv.Itoa(partNumber)}, + }) + if err != nil { + klog.Errorf("Presign failed: %s", err) + return "", err + } + return u.String(), nil +} + +func completeMultiPartUpload(ctx context.Context, uuid string, uploadID string, fileName string) (string, error) { + var partNumberMarker, maxParts int + _, core, err := getClients() + if err != nil { + klog.Errorf("getClient failed: %s", err) + return "", err + } + + bucketName := minioBucket + objectName := strings.TrimPrefix(path.Join(minioBasePath, path.Join(uuid[0:5], fileName)), "/") + + // TODO ? partNumberMarker, maxParts + listObjectPartsResult, err := core.ListObjectParts(ctx, bucketName, objectName, uploadID, partNumberMarker, maxParts) + if err != nil { + klog.Errorf("ListObjectParts failed: %s", err) + return "", err + } + var completeMultipartUpload CompleteMultipartUpload + for _, objectPart := range listObjectPartsResult.ObjectParts { + completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, minio.CompletePart{ + PartNumber: objectPart.PartNumber, + ETag: objectPart.ETag, + }) + } + sort.Sort(completedParts(completeMultipartUpload.Parts)) + + uploadInfo, err := core.CompleteMultipartUpload(ctx, bucketName, objectName, uploadID, completeMultipartUpload.Parts, minio.PutObjectOptions{}) + if err != nil { + klog.Errorf("CompleteMultipartUpload failed: %s", err) + return "", err + } + return uploadInfo.ETag, nil +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 4c06baaeb..921f624ca 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -21,7 +21,11 @@ import ( "fmt" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/dynamic" "k8s.io/utils/env" "sigs.k8s.io/controller-runtime/pkg/client" @@ -38,6 +42,7 @@ var ( ErrNoConfigEnv = fmt.Errorf("env:%s is not found", EnvConfigKey) ErrNoConfig = fmt.Errorf("config in configmap is empty") ErrNoConfigGateway = fmt.Errorf("config Gateway in configmap is not found") + ErrNoConfigMinIO = fmt.Errorf("config MinIO in comfigmap is not found") ) func GetSystemDatasource(ctx context.Context, c client.Client) (*arcadiav1alpha1.Datasource, error) { @@ -70,6 +75,41 @@ func GetGateway(ctx context.Context, c client.Client) (*Gateway, error) { return config.Gateway, nil } +func GetMinIO(ctx context.Context, c dynamic.Interface) (*MinIO, error) { + config, err := GetConfigDynamic(ctx, c) + if err != nil { + return nil, err + } + if config.MinIO == nil { + return nil, ErrNoConfigMinIO + } + return config.MinIO, nil +} + +func GetConfigDynamic(ctx context.Context, c dynamic.Interface) (config *Config, err error) { + cmName := env.GetString(EnvConfigKey, EnvConfigDefaultValue) + if cmName == "" { + return nil, ErrNoConfigEnv + } + cmNamespace := utils.GetSelfNamespace() + u, err := c.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"}).Namespace(cmNamespace).Get(ctx, cmName, v1.GetOptions{}) + if err != nil { + return nil, ErrNoConfig + } + data, found, err := unstructured.NestedStringMap(u.Object, "data") + if err != nil || !found { + return nil, ErrNoConfig + } + value, ok := data["config"] + if !ok || len(value) == 0 { + return nil, ErrNoConfig + } + if err = yaml.Unmarshal([]byte(value), &config); err != nil { + return nil, err + } + return config, nil +} + func GetConfig(ctx context.Context, c client.Client) (config *Config, err error) { cmName := env.GetString(EnvConfigKey, EnvConfigDefaultValue) if cmName == "" { diff --git a/pkg/config/config_type.go b/pkg/config/config_type.go index e2181261b..af4a8cba4 100644 --- a/pkg/config/config_type.go +++ b/pkg/config/config_type.go @@ -27,6 +27,9 @@ type Config struct { // Gateway to access LLM api services Gateway *Gateway `json:"gateway,omitempty"` + + // MinIO to access MinIO api services + MinIO *MinIO `json:"minIO,omitempty"` } // Gateway defines the way to access llm apis host by Arcadia @@ -34,3 +37,13 @@ type Gateway struct { APIServer string `json:"apiServer,omitempty"` Controller string `json:"controller,omitempty"` } + +// MinIO defines the way to access minio +type MinIO struct { + MinioAddress string `json:"minioAddress"` + MinioAccessKeyID string `json:"minioAccessKeyId"` + MinioSecretAccessKey string `json:"minioSecretAccessKey"` + MinioSecure bool `json:"minioSecure"` + MinioBucket string `json:"minioBucket"` + MinioBasePath string `json:"minioBasePath"` +}