Skip to content

Commit

Permalink
Merge pull request #235 from dayuy/minio
Browse files Browse the repository at this point in the history
feat: breakpoint upload via MinIO
  • Loading branch information
bjwswang authored Nov 21, 2023
2 parents bfa39c9 + 589f7eb commit 85f0df0
Show file tree
Hide file tree
Showing 8 changed files with 835 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.18.1
github.com/r3labs/sse/v2 v2.10.0
github.com/satori/go.uuid v1.2.0
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.8.4
github.com/tmc/langchaingo v0.0.0-20231017212009-949349d5ef9c
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
Expand Down
7 changes: 7 additions & 0 deletions graphql-server/go-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/kubeagi/arcadia/graphql-server/go-server/graph/generated"
"github.com/kubeagi/arcadia/graphql-server/go-server/graph/impl"
"github.com/kubeagi/arcadia/graphql-server/go-server/pkg/auth"
"github.com/kubeagi/arcadia/graphql-server/go-server/pkg/minio"
"github.com/kubeagi/arcadia/graphql-server/go-server/pkg/oidc"

_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
Expand Down Expand Up @@ -85,6 +86,12 @@ func main() {
http.Handle("/bff", srv)
}

http.HandleFunc("/minio/get_chunks", minio.GetSuccessChunks)
http.HandleFunc("/minio/new_multipart", minio.NewMultipart)
http.HandleFunc("/minio/get_multipart_url", minio.GetMultipartUploadURL)
http.HandleFunc("/minio/complete_multipart", minio.CompleteMultipart)
http.HandleFunc("/minio/update_chunk", minio.UpdateMultipart)

klog.Infof("listening server on port: %d", *port)
log.Fatal(http.ListenAndServe(fmt.Sprintf("%s:%d", *host, *port), nil))
}
120 changes: 120 additions & 0 deletions graphql-server/go-server/pkg/minio/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright 2023 KubeAGI.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package minio

import (
"context"
"os"
"sync"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"k8s.io/klog/v2"

client3 "github.com/kubeagi/arcadia/graphql-server/go-server/pkg/client"
"github.com/kubeagi/arcadia/pkg/config"
"github.com/kubeagi/arcadia/pkg/utils"
)

var minioClient *minio.Client = nil
var coreClient *minio.Core = nil

var mutex *sync.Mutex

var (
minioAddress string
minioAccessKeyID string
minioSecretAccessKey string
minioSecure bool
minioBucket string
minioBasePath string
)

func init() {
mutex = new(sync.Mutex)

if err := utils.SetSelfNamespace(); err != nil {
klog.Errorf("unable to get self namespace: %s", err)
os.Exit(1)
}

c, err := client3.GetClient(nil)
if err != nil {
panic(err)
}
minioConfig, err := config.GetMinIO(context.TODO(), c)
if err != nil {
panic(err)
}
minioAddress = minioConfig.MinioAddress
minioAccessKeyID = minioConfig.MinioAccessKeyID
minioSecretAccessKey = minioConfig.MinioSecretAccessKey
minioBucket = minioConfig.MinioBucket
minioBasePath = minioConfig.MinioBasePath
minioSecure = minioConfig.MinioSecure
}

func getClients() (*minio.Client, *minio.Core, error) {
var client1 *minio.Client
var client2 *minio.Core
var err error

mutex.Lock()

if minioClient != nil && coreClient != nil {
client1 = minioClient
client2 = coreClient
mutex.Unlock()
return client1, client2, nil
}

aliasedURL := minioAddress
accessKeyID := minioAccessKeyID
secretAccessKey := minioSecretAccessKey
secure := minioSecure

if minioClient == nil {
minioClient, err = minio.New(aliasedURL, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: secure,
})
}
if err != nil {
mutex.Unlock()
return nil, nil, err
}

client1 = minioClient

if coreClient == nil {
coreClient, err = minio.NewCore(aliasedURL, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: secure,
})
}

if err != nil {
mutex.Unlock()
return nil, nil, err
}

client2 = coreClient

mutex.Unlock()

return client1, client2, nil
}
101 changes: 101 additions & 0 deletions graphql-server/go-server/pkg/minio/model/file_chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright 2023 KubeAGI.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package models

import (
"errors"
)

const (
FileNotUploaded int = iota
FileUploaded
)

// maxPartsCount - maximum number of parts for a single multipart session.
const MaxPartsCount = 10000

// maxMultipartPutObjectSize - maximum size 5TiB of object for
// Multipart operation.
const MaxMultipartPutObjectSize = 1024 * 1024 * 1024 * 1024 * 5

// minPartSize - minimum part size 128MiB per object after which
// putObject behaves internally as multipart.
const MinPartSize = 1024 * 1024 * 64

type FileChunk struct {
UUID string
Md5 string
IsUploaded int
UploadID string
TotalChunks int
Size int64
FileName string
CompletedParts string
}

var fileChunks = make([]*FileChunk, 0, 250)

func GetFileChunkByMD5(md5 string) (*FileChunk, error) {
fileChunk := new(FileChunk)
matched := false
for _, chunk := range fileChunks {
if chunk.Md5 == md5 {
fileChunk = chunk
matched = true
}
}
if !matched {
return nil, errors.New("GetFileChunksByUUID failed")
}
return fileChunk, nil
}

func GetFileChunkByUUID(uuid string) (*FileChunk, error) {
fileChunk := new(FileChunk)
matched := false
for _, chunk := range fileChunks {
if chunk.UUID == uuid {
fileChunk = chunk
matched = true
break
}
}
if !matched {
return nil, errors.New("GetFileChunksByUUID failed")
}
return fileChunk, nil
}

func UpdateFileChunk(fileChunk *FileChunk) error {
updated := false
for _, chunk := range fileChunks {
if chunk.UUID == fileChunk.UUID && chunk.Md5 == fileChunk.Md5 {
chunk.IsUploaded = fileChunk.IsUploaded
chunk.CompletedParts = fileChunk.CompletedParts
updated = true
}
}
if !updated {
return errors.New("UpdateFileChunk failed")
}
return nil
}

func InsetFileChunk(fileChunk *FileChunk) (_ *FileChunk, err error) {
fileChunks = append(fileChunks, fileChunk)
return fileChunk, nil
}
Loading

0 comments on commit 85f0df0

Please sign in to comment.