Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor lease renew #31

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 67 additions & 90 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions Documentation/dev-guide/apispec/swagger/v3election.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -56,7 +56,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -98,7 +98,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -131,7 +131,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -164,7 +164,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -203,7 +203,7 @@
"revision": {
"type": "string",
"format": "int64",
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nreceived in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
"description": "revision is the key-value store revision when the request was applied, and it's\nunset (so 0) in case of calls not interacting with key-value store.\nFor watch progress responses, the header.revision indicates progress. All future events\nreceived in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
},
"raft_term": {
"type": "string",
Expand Down
6 changes: 3 additions & 3 deletions Documentation/dev-guide/apispec/swagger/v3lock.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -56,7 +56,7 @@
}
},
"default": {
"description": "An unexpected error response",
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/runtimeError"
}
Expand Down Expand Up @@ -95,7 +95,7 @@
"revision": {
"type": "string",
"format": "int64",
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nreceived in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
"description": "revision is the key-value store revision when the request was applied, and it's\nunset (so 0) in case of calls not interacting with key-value store.\nFor watch progress responses, the header.revision indicates progress. All future events\nreceived in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
},
"raft_term": {
"type": "string",
Expand Down
131 changes: 131 additions & 0 deletions api/etcdserverpb/gw/rpc.pb.gw.go

Large diffs are not rendered by default.

189 changes: 122 additions & 67 deletions api/etcdserverpb/raft_internal.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions api/etcdserverpb/raft_internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ message InternalRaftRequest {
AlarmRequest alarm = 10;

LeaseCheckpointRequest lease_checkpoint = 11 [(versionpb.etcd_version_field) = "3.4"];
LeaseKeepAliveRequest lease_renew = 12 [(versionpb.etcd_version_field) = "3.6"];

AuthEnableRequest auth_enable = 1000;
AuthDisableRequest auth_disable = 1011;
Expand Down
1 change: 1 addition & 0 deletions scripts/etcd_version_annotations.txt
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ etcdserverpb.InternalRaftRequest.downgrade_info_set: "3.5"
etcdserverpb.InternalRaftRequest.header: ""
etcdserverpb.InternalRaftRequest.lease_checkpoint: "3.4"
etcdserverpb.InternalRaftRequest.lease_grant: ""
etcdserverpb.InternalRaftRequest.lease_renew: "3.6"
etcdserverpb.InternalRaftRequest.lease_revoke: ""
etcdserverpb.InternalRaftRequest.put: ""
etcdserverpb.InternalRaftRequest.range: ""
Expand Down
1 change: 0 additions & 1 deletion server/etcdserver/api/etcdhttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func newPeerHandler(
mux.Handle(peerMembersPath, peerMembersHandler)
mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
if leaseHandler != nil {
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}
if downgradeEnabledHandler != nil {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions server/etcdserver/api/v3lock/v3lockpb/gw/v3lock.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions server/etcdserver/api/v3rpc/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,21 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
// or remote leader.
// Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded
// at rev 4.
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(resp.Header)
// todo(ahrtr): remove respForLeaseNotFound, we don't need to ErrLeaseNotFound separately.
respForLeaseNotFound := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(respForLeaseNotFound.Header)

ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
resp, err := ls.le.LeaseRenew(stream.Context(), req)
if err == lease.ErrLeaseNotFound {
err = nil
ttl = 0
respForLeaseNotFound.TTL = 0
resp = respForLeaseNotFound
}

if err != nil {
return togRPCError(err)
}

resp.TTL = ttl
err = stream.Send(resp)
if err != nil {
if isClientCtxErr(stream.Context().Err(), err) {
Expand Down
6 changes: 6 additions & 0 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type applierV3 interface {

LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error)

LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error)

Expand Down Expand Up @@ -206,6 +207,11 @@ func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
return &pb.LeaseRevokeResponse{Header: a.newHeader()}, err
}

func (a *applierV3backend) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
ttl, err := a.lessor.Renew(lease.LeaseID(lc.ID))
return &pb.LeaseKeepAliveResponse{Header: a.newHeader(), ID: lc.ID, TTL: ttl}, err
}

func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) {
for _, c := range lc.Checkpoints {
err := a.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL)
Expand Down
4 changes: 4 additions & 0 deletions server/etcdserver/apply/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ func (a *applierV3Corrupt) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantRe
func (a *applierV3Corrupt) LeaseRevoke(_ *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
return nil, errors.ErrCorrupt
}

func (a *applierV3Corrupt) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
return nil, errors.ErrCorrupt
}
3 changes: 3 additions & 0 deletions server/etcdserver/apply/uber_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s
case r.LeaseCheckpoint != nil:
op = "LeaseCheckpoint"
ar.Resp, ar.Err = a.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
case r.LeaseRenew != nil:
op = "LeaseRenew"
ar.Resp, ar.Err = a.applyV3.LeaseRenew(r.LeaseRenew)
case r.Alarm != nil:
op = "Alarm"
ar.Resp, ar.Err = a.Alarm(r.Alarm)
Expand Down
48 changes: 7 additions & 41 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ type Lessor interface {
// LeaseRevoke sends LeaseRevoke request to raft and toApply it after committed.
LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)

// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
// is returned.
LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
// LeaseRenew renews the lease.
LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error)

// LeaseTimeToLive retrieves lease information.
LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
Expand Down Expand Up @@ -276,45 +275,12 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
return resp.(*pb.LeaseRevokeResponse), nil
}

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}

ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
}
}

cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil {
leader, lerr := s.waitLeader(cctx)
if lerr != nil {
return -1, lerr
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
}
// Throttle in case of e.g. connection problems.
time.Sleep(50 * time.Millisecond)
}

if cctx.Err() == context.DeadlineExceeded {
return -1, errors.ErrTimeout
func (s *EtcdServer) LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRenew: r})
if err != nil {
return nil, err
}
return -1, errors.ErrCanceled
return resp.(*pb.LeaseKeepAliveResponse), err
}

func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
Expand Down
Loading