Skip to content

Commit

Permalink
Add vstreamer
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 22, 2025
1 parent 2008d1c commit d878c2d
Showing 1 changed file with 26 additions and 20 deletions.
46 changes: 26 additions & 20 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (vs *vstreamer) Stream() error {
if err != nil {
vs.vse.errorCounts.Add("StreamRows", 1)
vs.vse.vstreamersEndedWithErrors.Add(1)
return err
return vterrors.Wrapf(err, "failed to determine starting position")
}
vs.pos = pos
return vs.replicate(ctx)
Expand Down Expand Up @@ -289,7 +289,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
bufferedEvents = append(bufferedEvents, vevent)
default:
vs.vse.errorCounts.Add("BufferAndTransmit", 1)
return fmt.Errorf("unexpected event: %v", vevent)
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "usupported event type %s found for event: %+v",
vevent.Type.String(), vevent)
}
return nil
}
Expand Down Expand Up @@ -369,11 +370,13 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
case ev, ok := <-throttledEvents:
if !ok {
select {
case err := <-errs:
return err
case <-ctx.Done():
return nil
default:
}
return fmt.Errorf("unexpected server EOF")
return vterrors.Errorf(vtrpcpb.Code_ABORTED, "unexpected server EOF while parsing events")
}
vevents, err := vs.parseEvent(ev, bufferAndTransmit)
if err != nil {
Expand All @@ -386,16 +389,18 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
return nil
}
vs.vse.errorCounts.Add("BufferAndTransmit", 1)
return fmt.Errorf("error sending event: %v", err)
return vterrors.Wrapf(err, "error sending event: %+v", vevent)
}
}
case vs.vschema = <-vs.vevents:
select {
case err := <-errs:
return err
case <-ctx.Done():
return nil
default:
if err := vs.rebuildPlans(); err != nil {
return err
return vterrors.Wrap(err, "failed to rebuild replication plans")
}
}
case err := <-errs:
Expand All @@ -409,7 +414,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
return nil
}
vs.vse.errorCounts.Add("Send", 1)
return fmt.Errorf("error sending event: %v", err)
return vterrors.Wrapf(err, "failed to send heartbeat event")
}
}
}
Expand Down Expand Up @@ -460,7 +465,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev
case ev.IsGTID():
gtid, hasBegin, err := ev.GTID(vs.format)
if err != nil {
return nil, fmt.Errorf("can't get GTID from binlog event: %v, event data: %#v", err, ev)
return nil, vterrors.Wrapf(err, "failed to get GTID from binlog event: %#v", ev)
}
if hasBegin {
vevents = append(vevents, &binlogdatapb.VEvent{
Expand All @@ -478,7 +483,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev
case ev.IsQuery():
q, err := ev.Query(vs.format)
if err != nil {
return nil, fmt.Errorf("can't get query from binlog event: %v, event data: %#v", err, ev)
return nil, vterrors.Wrapf(err, "failed to get query from binlog event: %#v", ev)
}
// Insert/Delete/Update are supported only to be used in the context of external mysql streams where source databases
// could be using SBR. Vitess itself will never run into cases where it needs to consume non rbr statements.
Expand Down Expand Up @@ -579,7 +584,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev

tm, err := ev.TableMap(vs.format)
if err != nil {
return nil, err
return nil, vterrors.Wrapf(err, "failed to parse table map from binlog event: %#v", ev)
}
if plan, ok := vs.plans[id]; ok {
// When the underlying mysql server restarts the table map can change.
Expand Down Expand Up @@ -618,7 +623,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev
vevent, err := vs.buildTablePlan(id, tm)
if err != nil {
vs.vse.errorCounts.Add("TablePlan", 1)
return nil, err
return nil, vterrors.Wrapf(err, "failed to build table replication plan for table %s", tm.Name)
}
if vevent != nil {
vevents = append(vevents, vevent)
Expand Down Expand Up @@ -907,7 +912,7 @@ func getExtColInfos(ctx context.Context, cp dbconfigs.Connector, se *schema.Engi
extColInfos := make(map[string]*extColInfo)
conn, err := cp.Connect(ctx)
if err != nil {
return nil, err
return nil, vterrors.Wrapf(err, "failed to connect to database %s", database)
}
defer conn.Close()
queryTemplate := "select column_name, column_type, collation_name from information_schema.columns where table_schema=%s and table_name=%s;"
Expand Down Expand Up @@ -975,7 +980,7 @@ nextrow:
for _, row := range rows.Rows {
afterOK, afterValues, _, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns, row.JSONPartialValues)
if err != nil {
return nil, err
return nil, vterrors.Wrap(err, "failed to extract journal from binlog event and apply filters")
}
if !afterOK {
// This can happen if someone manually deleted rows.
Expand All @@ -997,7 +1002,7 @@ nextrow:
return nil, err
}
if err := prototext.Unmarshal(avBytes, journal); err != nil {
return nil, err
return nil, vterrors.Wrap(err, "failed to unmarshal journal event")
}
vevents = append(vevents, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_JOURNAL,
Expand All @@ -1014,13 +1019,13 @@ func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *strea
// The BEFORE image does not have partial JSON values so we pass an empty bitmap.
beforeOK, beforeValues, _, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns, mysql.Bitmap{})
if err != nil {
return nil, err
return nil, vterrors.Wrap(err, "failed to extract row's before values from binlog event and apply filters")
}
// The AFTER image is where we may have partial JSON values, as reflected in the
// row's JSONPartialValues bitmap.
afterOK, afterValues, partial, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns, row.JSONPartialValues)
if err != nil {
return nil, err
return nil, vterrors.Wrap(err, "failed to extract row's after values from binlog event and apply filters")
}
if !beforeOK && !afterOK {
continue
Expand Down Expand Up @@ -1125,7 +1130,8 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
if err != nil {
log.Errorf("extractRowAndFilter: %s, table: %s, colNum: %d, fields: %+v, current values: %+v",
err, plan.Table.Name, colNum, plan.Table.Fields, values)
return false, nil, false, err
return false, nil, false, vterrors.Wrapf(err, "failed to extract row's value for column %s from binlog event",
plan.Table.Fields[colNum].Name)
}
pos += l

Expand Down Expand Up @@ -1201,15 +1207,15 @@ func buildEnumStringValue(env *vtenv.Environment, plan *streamerPlan, colNum int
// table map event to initialize it.
if plan.EnumSetValuesMap == nil {
if err := addEnumAndSetMappingstoPlan(env, plan.Plan, plan.Table.Fields, plan.TableMap.Metadata); err != nil {
return sqltypes.Value{}, err
return sqltypes.Value{}, vterrors.Wrap(err, "failed to build ENUM column integer to string mappings")
}
}
// ENUM columns are stored as an unsigned 16-bit integer as they can contain a maximum
// of 65,535 elements (https://dev.mysql.com/doc/refman/en/enum.html) with the 0 element
// reserved for any integer value that has no string mapping.
iv, err := value.ToUint16()
if err != nil {
return sqltypes.Value{}, fmt.Errorf("no valid integer value found for column %s in table %s, bytes: %b",
return sqltypes.Value{}, vterrors.Wrapf(err, "no valid integer value found for column %s in table %s, bytes: %b",
plan.Table.Fields[colNum].Name, plan.Table.Name, iv)
}
var strVal string
Expand All @@ -1236,7 +1242,7 @@ func buildSetStringValue(env *vtenv.Environment, plan *streamerPlan, colNum int,
// table map event to initialize it.
if plan.EnumSetValuesMap == nil {
if err := addEnumAndSetMappingstoPlan(env, plan.Plan, plan.Table.Fields, plan.TableMap.Metadata); err != nil {
return sqltypes.Value{}, err
return sqltypes.Value{}, vterrors.Wrap(err, "failed to build SET column integer to string mappings")
}
}
// A SET column can have 64 unique values: https://dev.mysql.com/doc/refman/en/set.html
Expand All @@ -1245,7 +1251,7 @@ func buildSetStringValue(env *vtenv.Environment, plan *streamerPlan, colNum int,
val := bytes.Buffer{}
iv, err := value.ToUint64()
if err != nil {
return value, fmt.Errorf("no valid integer value found for column %s in table %s, bytes: %b",
return value, vterrors.Wrapf(err, "no valid integer value found for column %s in table %s, bytes: %b",
plan.Table.Fields[colNum].Name, plan.Table.Name, iv)
}
idx := 1
Expand Down

0 comments on commit d878c2d

Please sign in to comment.