Skip to content

Commit

Permalink
expose callback function
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Feb 5, 2025
1 parent 5a5c07e commit 99cf51e
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
11 changes: 6 additions & 5 deletions client/servicediscovery/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,24 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
)

type leaderSwitchedCallbackFunc func(string) error
// LeaderSwitchedCallbackFunc is the callback function for leader switched event
type LeaderSwitchedCallbackFunc func(string) error

// serviceCallbacks contains all the callback functions for service discovery events
type serviceCallbacks struct {
sync.RWMutex
// serviceModeUpdateCb will be called when the service mode gets updated
serviceModeUpdateCb func(pdpb.ServiceMode)
// leaderSwitchedCbs will be called after the leader switched
leaderSwitchedCbs []leaderSwitchedCallbackFunc
leaderSwitchedCbs []LeaderSwitchedCallbackFunc
// membersChangedCbs will be called after there is any membership change in the
// leader and followers
membersChangedCbs []func()
}

func newServiceCallbacks() *serviceCallbacks {
return &serviceCallbacks{
leaderSwitchedCbs: make([]leaderSwitchedCallbackFunc, 0),
leaderSwitchedCbs: make([]LeaderSwitchedCallbackFunc, 0),
membersChangedCbs: make([]func(), 0),
}
}
Expand All @@ -47,7 +48,7 @@ func (c *serviceCallbacks) setServiceModeUpdateCallback(cb func(pdpb.ServiceMode
c.serviceModeUpdateCb = cb
}

func (c *serviceCallbacks) addLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc) {
func (c *serviceCallbacks) addLeaderSwitchedCallback(cb LeaderSwitchedCallbackFunc) {
c.Lock()
defer c.Unlock()
c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, cb)
Expand All @@ -72,7 +73,7 @@ func (c *serviceCallbacks) onServiceModeUpdate(mode pdpb.ServiceMode) {

func (c *serviceCallbacks) onLeaderSwitched(leader string) error {
c.RLock()
cbs := make([]leaderSwitchedCallbackFunc, len(c.leaderSwitchedCbs))
cbs := make([]LeaderSwitchedCallbackFunc, len(c.leaderSwitchedCbs))
copy(cbs, c.leaderSwitchedCbs)
c.RUnlock()

Expand Down
4 changes: 2 additions & 2 deletions client/servicediscovery/mock_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ func (*mockServiceDiscovery) ScheduleCheckMemberChanged() {}
func (*mockServiceDiscovery) CheckMemberChanged() error { return nil }

// ExecAndAddLeaderSwitchedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) ExecAndAddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {}
func (*mockServiceDiscovery) ExecAndAddLeaderSwitchedCallback(LeaderSwitchedCallbackFunc) {}

// AddLeaderSwitchedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) AddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {}
func (*mockServiceDiscovery) AddLeaderSwitchedCallback(LeaderSwitchedCallbackFunc) {}

// AddMembersChangedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) AddMembersChangedCallback(func()) {}
8 changes: 4 additions & 4 deletions client/servicediscovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ type ServiceDiscovery interface {
// in a quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster.
CheckMemberChanged() error
// ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then.
ExecAndAddLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc)
ExecAndAddLeaderSwitchedCallback(cb LeaderSwitchedCallbackFunc)
// AddLeaderSwitchedCallback adds callbacks which will be called when the leader
// in a quorum-based cluster or the primary in a primary/secondary configured cluster
// is switched.
AddLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc)
AddLeaderSwitchedCallback(cb LeaderSwitchedCallbackFunc)
// AddMembersChangedCallback adds callbacks which will be called when any leader/follower
// in a quorum-based cluster or any primary/secondary in a primary/secondary configured cluster
// is changed.
Expand Down Expand Up @@ -780,7 +780,7 @@ func (c *serviceDiscovery) CheckMemberChanged() error {
}

// ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then.
func (c *serviceDiscovery) ExecAndAddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) {
func (c *serviceDiscovery) ExecAndAddLeaderSwitchedCallback(callback LeaderSwitchedCallbackFunc) {
url := c.getLeaderURL()
if len(url) > 0 {
if err := callback(url); err != nil {
Expand All @@ -794,7 +794,7 @@ func (c *serviceDiscovery) ExecAndAddLeaderSwitchedCallback(callback leaderSwitc
// AddLeaderSwitchedCallback adds callbacks which will be called when the leader
// in a quorum-based cluster or the primary in a primary/secondary configured cluster
// is switched.
func (c *serviceDiscovery) AddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) {
func (c *serviceDiscovery) AddLeaderSwitchedCallback(callback LeaderSwitchedCallbackFunc) {
c.callbacks.addLeaderSwitchedCallback(callback)
}

Expand Down
6 changes: 3 additions & 3 deletions client/servicediscovery/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type tsoServiceDiscovery struct {
clientConns sync.Map // Store as map[string]*grpc.ClientConn

// tsoLeaderUpdatedCb will be called when the TSO leader is updated.
tsoLeaderUpdatedCb leaderSwitchedCallbackFunc
tsoLeaderUpdatedCb LeaderSwitchedCallbackFunc

checkMembershipCh chan struct{}

Expand Down Expand Up @@ -359,7 +359,7 @@ func (c *tsoServiceDiscovery) CheckMemberChanged() error {
}

// ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then.
func (c *tsoServiceDiscovery) ExecAndAddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) {
func (c *tsoServiceDiscovery) ExecAndAddLeaderSwitchedCallback(callback LeaderSwitchedCallbackFunc) {
url := c.getPrimaryURL()
if len(url) > 0 {
if err := callback(url); err != nil {
Expand All @@ -371,7 +371,7 @@ func (c *tsoServiceDiscovery) ExecAndAddLeaderSwitchedCallback(callback leaderSw

// AddLeaderSwitchedCallback adds callbacks which will be called when the primary in
// a primary/secondary configured cluster is switched.
func (*tsoServiceDiscovery) AddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {}
func (*tsoServiceDiscovery) AddLeaderSwitchedCallback(LeaderSwitchedCallbackFunc) {}

// AddMembersChangedCallback adds callbacks which will be called when any primary/secondary
// in a primary/secondary configured cluster is changed.
Expand Down

0 comments on commit 99cf51e

Please sign in to comment.