Skip to content

Commit

Permalink
Include tablet process details (hostname and port) in messages
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 16, 2025
1 parent 1f7870f commit a136b86
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,14 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if err != nil {
return tabletPickerErr(err)
}
tabletAliasString := topoproto.TabletAliasString(tablet.Alias)
var tabletPortStr string
if tablet.PortMap["grpc"] != 0 {
tabletPortStr = fmt.Sprintf(":%d", tablet.PortMap["grpc"])
}
tabletMsgDetails := fmt.Sprintf("%s (currently at %s%s)",
topoproto.TabletAliasString(tablet.Alias), tablet.Hostname, tabletPortStr)
log.Infof("Picked %s tablet %s for VStream in %s/%s within the %s cell(s)",
vs.tabletType.String(), tabletAliasString, sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))
vs.tabletType.String(), tabletMsgDetails, sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))

target := &querypb.Target{
Keyspace: sgtid.Keyspace,
Expand All @@ -560,7 +565,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
tabletConn, err := vs.vsm.resolver.GetGateway().QueryServiceByAlias(ctx, tablet.Alias, target)
if err != nil {
log.Errorf(err.Error())
return vterrors.Wrapf(err, "failed to get tablet connection to %s", tabletAliasString)
return vterrors.Wrapf(err, "failed to get tablet connection to %s", tabletMsgDetails)
}

errCh := make(chan error, 1)
Expand All @@ -569,9 +574,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var err error
switch {
case ctx.Err() != nil:
err = vterrors.Wrapf(ctx.Err(), "context ended while streaming tablet health from %s", tabletAliasString)
err = vterrors.Wrapf(ctx.Err(), "context ended while streaming tablet health from %s", tabletMsgDetails)
case shr == nil || shr.RealtimeStats == nil || shr.Target == nil:
err = fmt.Errorf("health check failed on %s", tabletAliasString)
err = fmt.Errorf("health check failed on %s", tabletMsgDetails)
case vs.tabletType != shr.Target.TabletType:
err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType)
Expand All @@ -584,7 +589,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
if err != nil {
log.Warningf("Tablet state changed: %s, attempting to restart", err)
err = vterrors.Wrapf(err, "error streaming tablet health from %s", tabletAliasString)
err = vterrors.Wrapf(err, "error streaming tablet health from %s", tabletMsgDetails)
errCh <- err
return err
}
Expand All @@ -609,7 +614,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
Options: options,
}
var vstreamCreatedOnce sync.Once
log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req)
log.Infof("Starting to vstream from %s, with req %+v", tabletMsgDetails, req)
err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0
Expand All @@ -623,10 +628,10 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
select {
case <-ctx.Done():
return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s",
tabletAliasString, sgtid.Keyspace, sgtid.Shard)
tabletMsgDetails, sgtid.Keyspace, sgtid.Shard)
case streamErr := <-errCh:
return vterrors.Wrapf(streamErr, "error streaming from tablet %s in %s/%s",
tabletAliasString, sgtid.Keyspace, sgtid.Shard)
tabletMsgDetails, sgtid.Keyspace, sgtid.Shard)
case <-journalDone:
// Unreachable.
// This can happen if a server misbehaves and does not end
Expand All @@ -636,7 +641,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}

aligningStreamsErr := fmt.Sprintf("error aligning streams across %s/%s", sgtid.Keyspace, sgtid.Shard)
sendingEventsErr := fmt.Sprintf("error sending event batch from tablet %s", tabletAliasString)
sendingEventsErr := fmt.Sprintf("error sending event batch from tablet %s", tabletMsgDetails)

sendevents := make([]*binlogdatapb.VEvent, 0, len(events))
for i, event := range events {
Expand Down Expand Up @@ -716,7 +721,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
je, err := vs.getJournalEvent(ctx, sgtid, journal)
if err != nil {
return vterrors.Wrapf(err, "error getting journal event for shard GTID %+v on tablet %s",
sgtid, tabletAliasString)
sgtid, tabletMsgDetails)
}
if je != nil {
var endTimer *time.Timer
Expand All @@ -737,7 +742,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
select {
case <-ctx.Done():
return vterrors.Wrapf(ctx.Err(), "context ended while waiting for journal event for shard GTID %+v on tablet %s",
sgtid, tabletAliasString)
sgtid, tabletMsgDetails)
case <-journalDone:
if endTimer != nil {
<-endTimer.C
Expand Down Expand Up @@ -765,14 +770,14 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if err == nil {
// Unreachable.
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly on tablet %s in %s/%s",
tabletAliasString, sgtid.Keyspace, sgtid.Shard)
tabletMsgDetails, sgtid.Keyspace, sgtid.Shard)
}

retry, ignoreTablet := vs.shouldRetry(err)
if !retry {
log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err)
return vterrors.Wrapf(err, "error in vstream for %s/%s on tablet %s",
sgtid.Keyspace, sgtid.Shard, tabletAliasString)
sgtid.Keyspace, sgtid.Shard, tabletMsgDetails)
}
if ignoreTablet {
ignoreTablets = append(ignoreTablets, tablet.GetAlias())
Expand All @@ -783,7 +788,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
return vterrors.Wrapf(err, "persistent error in vstream for %s/%s on tablet %s; giving up",
sgtid.Keyspace, sgtid.Shard, tabletAliasString)
sgtid.Keyspace, sgtid.Shard, tabletMsgDetails)
}
log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err)
}
Expand Down

0 comments on commit a136b86

Please sign in to comment.