From d878c2d7247e2eb12498782412bfc899d4543909 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 21 Jan 2025 21:07:31 -0500 Subject: [PATCH] Add vstreamer Signed-off-by: Matt Lord --- .../tabletserver/vstreamer/vstreamer.go | 46 +++++++++++-------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index fb4cb324047..10e9ffb1062 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -184,7 +184,7 @@ func (vs *vstreamer) Stream() error { if err != nil { vs.vse.errorCounts.Add("StreamRows", 1) vs.vse.vstreamersEndedWithErrors.Add(1) - return err + return vterrors.Wrapf(err, "failed to determine starting position") } vs.pos = pos return vs.replicate(ctx) @@ -289,7 +289,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog bufferedEvents = append(bufferedEvents, vevent) default: vs.vse.errorCounts.Add("BufferAndTransmit", 1) - return fmt.Errorf("unexpected event: %v", vevent) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "usupported event type %s found for event: %+v", + vevent.Type.String(), vevent) } return nil } @@ -369,11 +370,13 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog case ev, ok := <-throttledEvents: if !ok { select { + case err := <-errs: + return err case <-ctx.Done(): return nil default: } - return fmt.Errorf("unexpected server EOF") + return vterrors.Errorf(vtrpcpb.Code_ABORTED, "unexpected server EOF while parsing events") } vevents, err := vs.parseEvent(ev, bufferAndTransmit) if err != nil { @@ -386,16 +389,18 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog return nil } vs.vse.errorCounts.Add("BufferAndTransmit", 1) - return fmt.Errorf("error sending event: %v", err) + return vterrors.Wrapf(err, "error sending event: %+v", vevent) } } case vs.vschema = <-vs.vevents: select { + case err := <-errs: + return err case <-ctx.Done(): return nil default: if err := vs.rebuildPlans(); err != nil { - return err + return vterrors.Wrap(err, "failed to rebuild replication plans") } } case err := <-errs: @@ -409,7 +414,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog return nil } vs.vse.errorCounts.Add("Send", 1) - return fmt.Errorf("error sending event: %v", err) + return vterrors.Wrapf(err, "failed to send heartbeat event") } } } @@ -460,7 +465,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev case ev.IsGTID(): gtid, hasBegin, err := ev.GTID(vs.format) if err != nil { - return nil, fmt.Errorf("can't get GTID from binlog event: %v, event data: %#v", err, ev) + return nil, vterrors.Wrapf(err, "failed to get GTID from binlog event: %#v", ev) } if hasBegin { vevents = append(vevents, &binlogdatapb.VEvent{ @@ -478,7 +483,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev case ev.IsQuery(): q, err := ev.Query(vs.format) if err != nil { - return nil, fmt.Errorf("can't get query from binlog event: %v, event data: %#v", err, ev) + return nil, vterrors.Wrapf(err, "failed to get query from binlog event: %#v", ev) } // Insert/Delete/Update are supported only to be used in the context of external mysql streams where source databases // could be using SBR. Vitess itself will never run into cases where it needs to consume non rbr statements. @@ -579,7 +584,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev tm, err := ev.TableMap(vs.format) if err != nil { - return nil, err + return nil, vterrors.Wrapf(err, "failed to parse table map from binlog event: %#v", ev) } if plan, ok := vs.plans[id]; ok { // When the underlying mysql server restarts the table map can change. @@ -618,7 +623,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev vevent, err := vs.buildTablePlan(id, tm) if err != nil { vs.vse.errorCounts.Add("TablePlan", 1) - return nil, err + return nil, vterrors.Wrapf(err, "failed to build table replication plan for table %s", tm.Name) } if vevent != nil { vevents = append(vevents, vevent) @@ -907,7 +912,7 @@ func getExtColInfos(ctx context.Context, cp dbconfigs.Connector, se *schema.Engi extColInfos := make(map[string]*extColInfo) conn, err := cp.Connect(ctx) if err != nil { - return nil, err + return nil, vterrors.Wrapf(err, "failed to connect to database %s", database) } defer conn.Close() queryTemplate := "select column_name, column_type, collation_name from information_schema.columns where table_schema=%s and table_name=%s;" @@ -975,7 +980,7 @@ nextrow: for _, row := range rows.Rows { afterOK, afterValues, _, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns, row.JSONPartialValues) if err != nil { - return nil, err + return nil, vterrors.Wrap(err, "failed to extract journal from binlog event and apply filters") } if !afterOK { // This can happen if someone manually deleted rows. @@ -997,7 +1002,7 @@ nextrow: return nil, err } if err := prototext.Unmarshal(avBytes, journal); err != nil { - return nil, err + return nil, vterrors.Wrap(err, "failed to unmarshal journal event") } vevents = append(vevents, &binlogdatapb.VEvent{ Type: binlogdatapb.VEventType_JOURNAL, @@ -1014,13 +1019,13 @@ func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *strea // The BEFORE image does not have partial JSON values so we pass an empty bitmap. beforeOK, beforeValues, _, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns, mysql.Bitmap{}) if err != nil { - return nil, err + return nil, vterrors.Wrap(err, "failed to extract row's before values from binlog event and apply filters") } // The AFTER image is where we may have partial JSON values, as reflected in the // row's JSONPartialValues bitmap. afterOK, afterValues, partial, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns, row.JSONPartialValues) if err != nil { - return nil, err + return nil, vterrors.Wrap(err, "failed to extract row's after values from binlog event and apply filters") } if !beforeOK && !afterOK { continue @@ -1125,7 +1130,8 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo if err != nil { log.Errorf("extractRowAndFilter: %s, table: %s, colNum: %d, fields: %+v, current values: %+v", err, plan.Table.Name, colNum, plan.Table.Fields, values) - return false, nil, false, err + return false, nil, false, vterrors.Wrapf(err, "failed to extract row's value for column %s from binlog event", + plan.Table.Fields[colNum].Name) } pos += l @@ -1201,7 +1207,7 @@ func buildEnumStringValue(env *vtenv.Environment, plan *streamerPlan, colNum int // table map event to initialize it. if plan.EnumSetValuesMap == nil { if err := addEnumAndSetMappingstoPlan(env, plan.Plan, plan.Table.Fields, plan.TableMap.Metadata); err != nil { - return sqltypes.Value{}, err + return sqltypes.Value{}, vterrors.Wrap(err, "failed to build ENUM column integer to string mappings") } } // ENUM columns are stored as an unsigned 16-bit integer as they can contain a maximum @@ -1209,7 +1215,7 @@ func buildEnumStringValue(env *vtenv.Environment, plan *streamerPlan, colNum int // reserved for any integer value that has no string mapping. iv, err := value.ToUint16() if err != nil { - return sqltypes.Value{}, fmt.Errorf("no valid integer value found for column %s in table %s, bytes: %b", + return sqltypes.Value{}, vterrors.Wrapf(err, "no valid integer value found for column %s in table %s, bytes: %b", plan.Table.Fields[colNum].Name, plan.Table.Name, iv) } var strVal string @@ -1236,7 +1242,7 @@ func buildSetStringValue(env *vtenv.Environment, plan *streamerPlan, colNum int, // table map event to initialize it. if plan.EnumSetValuesMap == nil { if err := addEnumAndSetMappingstoPlan(env, plan.Plan, plan.Table.Fields, plan.TableMap.Metadata); err != nil { - return sqltypes.Value{}, err + return sqltypes.Value{}, vterrors.Wrap(err, "failed to build SET column integer to string mappings") } } // A SET column can have 64 unique values: https://dev.mysql.com/doc/refman/en/set.html @@ -1245,7 +1251,7 @@ func buildSetStringValue(env *vtenv.Environment, plan *streamerPlan, colNum int, val := bytes.Buffer{} iv, err := value.ToUint64() if err != nil { - return value, fmt.Errorf("no valid integer value found for column %s in table %s, bytes: %b", + return value, vterrors.Wrapf(err, "no valid integer value found for column %s in table %s, bytes: %b", plan.Table.Fields[colNum].Name, plan.Table.Name, iv) } idx := 1