Skip to content

Commit

Permalink
feat: update api protoc version to v2.0.118
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jun 17, 2024
1 parent 1c9484a commit 40528ff
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 126 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.115
d7y.io/api/v2 v2.0.120
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down Expand Up @@ -89,7 +89,7 @@ require (
golang.org/x/time v0.5.0
google.golang.org/api v0.183.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
google.golang.org/protobuf v1.34.2
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.5.4
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
d7y.io/api/v2 v2.0.115 h1:Z+We2FOkc2tNpf/EZFEF3C88QTKQqv2ap2OMTNF/l70=
d7y.io/api/v2 v2.0.115/go.mod h1:lgjG+icVnSX8Yevlt80cxrxL8pvBg7LNI3FnEaeAH/s=
d7y.io/api/v2 v2.0.120 h1:Vt656+s1xpov2z4YYme442I/Ffa/rbyCOz0PIJdYsPg=
d7y.io/api/v2 v2.0.120/go.mod h1:5n5c+0oceb9/Ih4xL6UNRwQEZhBztiHMf4ghb+wGx4U=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down Expand Up @@ -2115,8 +2115,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
32 changes: 32 additions & 0 deletions pkg/rpc/dfdaemon/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"

commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"

logger "d7y.io/dragonfly/v2/internal/dflog"
Expand Down Expand Up @@ -88,6 +89,15 @@ type V2 interface {
// DownloadTask downloads task from p2p network.
DownloadTask(context.Context, string, *dfdaemonv2.DownloadTaskRequest, ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadTaskClient, error)

// DownloadCacheTask downloads cache task from p2p network.
DownloadCacheTask(context.Context, *dfdaemonv2.DownloadCacheTaskRequest, ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadCacheTaskClient, error)

// StatCacheTask stats cache task information.
StatCacheTask(context.Context, *dfdaemonv2.StatCacheTaskRequest, ...grpc.CallOption) (*commonv2.CacheTask, error)

// DeleteCacheTask deletes cache task from p2p network.
DeleteCacheTask(context.Context, *dfdaemonv2.DeleteCacheTaskRequest, ...grpc.CallOption) error

// Close tears down the ClientConn and all underlying connections.
Close() error
}
Expand Down Expand Up @@ -131,3 +141,25 @@ func (v *v2) DownloadTask(ctx context.Context, taskID string, req *dfdaemonv2.Do
opts...,
)
}

// DownloadCacheTask downloads cache task from p2p network.
func (v *v2) DownloadCacheTask(ctx context.Context, req *dfdaemonv2.DownloadCacheTaskRequest, opts ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadCacheTaskClient, error) {
return v.DfdaemonUploadClient.DownloadCacheTask(ctx, req, opts...)
}

// StatCacheTask stats cache task information.
func (v *v2) StatCacheTask(ctx context.Context, req *dfdaemonv2.StatCacheTaskRequest, opts ...grpc.CallOption) (*commonv2.CacheTask, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

return v.DfdaemonUploadClient.StatCacheTask(ctx, req, opts...)
}

// DeleteCacheTask deletes cache task from p2p network.
func (v *v2) DeleteCacheTask(ctx context.Context, req *dfdaemonv2.DeleteCacheTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.DfdaemonUploadClient.DeleteCacheTask(ctx, req, opts...)
return err
}
60 changes: 60 additions & 0 deletions pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 16 additions & 33 deletions pkg/rpc/scheduler/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,24 +124,20 @@ type V2 interface {
// Checks information of peer.
StatPeer(context.Context, *schedulerv2.StatPeerRequest, ...grpc.CallOption) (*commonv2.Peer, error)

// LeavePeer releases peer in scheduler.
LeavePeer(context.Context, *schedulerv2.LeavePeerRequest, ...grpc.CallOption) error

// TODO exchange peer api definition.
// ExchangePeer exchanges peer information.
ExchangePeer(context.Context, *schedulerv2.ExchangePeerRequest, ...grpc.CallOption) (*schedulerv2.ExchangePeerResponse, error)
// DeletePeer releases peer in scheduler.
DeletePeer(context.Context, *schedulerv2.DeletePeerRequest, ...grpc.CallOption) error

// Checks information of task.
StatTask(context.Context, *schedulerv2.StatTaskRequest, ...grpc.CallOption) (*commonv2.Task, error)

// LeaveTask releases task in scheduler.
LeaveTask(context.Context, *schedulerv2.LeaveTaskRequest, ...grpc.CallOption) error
// DeleteTask releases task in scheduler.
DeleteTask(context.Context, *schedulerv2.DeleteTaskRequest, ...grpc.CallOption) error

// AnnounceHost announces host to scheduler.
AnnounceHost(context.Context, *schedulerv2.AnnounceHostRequest, ...grpc.CallOption) error

// LeaveHost releases host in scheduler.
LeaveHost(context.Context, *schedulerv2.LeaveHostRequest, ...grpc.CallOption) error
// DeleteHost releases host in scheduler.
DeleteHost(context.Context, *schedulerv2.DeleteHostRequest, ...grpc.CallOption) error

// SyncProbes sync probes of the host.
SyncProbes(context.Context, *schedulerv2.SyncProbesRequest, ...grpc.CallOption) (schedulerv2.Scheduler_SyncProbesClient, error)
Expand Down Expand Up @@ -179,12 +175,12 @@ func (v *v2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest, opt
)
}

// LeavePeer releases peer in scheduler.
func (v *v2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest, opts ...grpc.CallOption) error {
// DeletePeer releases peer in scheduler.
func (v *v2) DeletePeer(ctx context.Context, req *schedulerv2.DeletePeerRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.SchedulerClient.LeavePeer(
_, err := v.SchedulerClient.DeletePeer(
context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId),
req,
opts...,
Expand All @@ -193,19 +189,6 @@ func (v *v2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest, o
return err
}

// TODO exchange peer api definition.
// ExchangePeer exchanges peer information.
func (v *v2) ExchangePeer(ctx context.Context, req *schedulerv2.ExchangePeerRequest, opts ...grpc.CallOption) (*schedulerv2.ExchangePeerResponse, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

return v.SchedulerClient.ExchangePeer(
context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId),
req,
opts...,
)
}

// Checks information of task.
func (v *v2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest, opts ...grpc.CallOption) (*commonv2.Task, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
Expand All @@ -218,12 +201,12 @@ func (v *v2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest, opt
)
}

// LeaveTask releases task in scheduler.
func (v *v2) LeaveTask(ctx context.Context, req *schedulerv2.LeaveTaskRequest, opts ...grpc.CallOption) error {
// DeleteTask releases task in scheduler.
func (v *v2) DeleteTask(ctx context.Context, req *schedulerv2.DeleteTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.SchedulerClient.LeaveTask(
_, err := v.SchedulerClient.DeleteTask(
context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId),
req,
opts...,
Expand Down Expand Up @@ -261,22 +244,22 @@ func (v *v2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
return eg.Wait()
}

// LeaveHost releases host in all schedulers.
func (v *v2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest, opts ...grpc.CallOption) error {
// DeleteHost releases host in all schedulers.
func (v *v2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

circle, err := v.GetCircle()
if err != nil {
return err
}
logger.Infof("leave host circle is %#v", circle)
logger.Infof("delete host circle is %#v", circle)

eg, _ := errgroup.WithContext(ctx)
for _, virtualTaskID := range circle {
virtualTaskID := virtualTaskID
eg.Go(func() error {
if _, err := v.SchedulerClient.LeaveHost(
if _, err := v.SchedulerClient.DeleteHost(
context.WithValue(ctx, pkgbalancer.ContextKey, virtualTaskID),
req,
opts...,
Expand Down
56 changes: 18 additions & 38 deletions pkg/rpc/scheduler/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (j *job) preheatV2(ctx context.Context, req *internaljob.PreheatRequest) er
Priority: commonv2.Priority(req.Priority),
FilteredQueryParams: filteredQueryParams,
RequestHeader: req.Headers,
PieceLength: uint32(req.PieceLength),
PieceLength: uint64(req.PieceLength),
}})
if err != nil {
logger.Errorf("preheat(v2) %s failed: %s", req.URL, err.Error())
Expand Down
Loading

0 comments on commit 40528ff

Please sign in to comment.