From e4374be90f455b4a85e4e8d871386cbc177cf0c1 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 31 Jan 2025 21:42:17 -0500 Subject: [PATCH] Add e2e test Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/config_test.go | 2 +- go/test/endtoend/vreplication/vstream_test.go | 144 ++++++++++++++++++ 2 files changed, 145 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index b856572ab7b..1028dd36a85 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -46,7 +46,7 @@ import ( var ( // All standard user tables should have a primary key and at least one secondary key. customerTypes = []string{"'individual'", "'soho'", "'enterprise'"} - customerTableTemplate = `create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null, + customerTableTemplate = `create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_general_ci, meta json default null, industryCategory varchar(100) generated always as (json_extract(meta, _utf8mb4'$.industry')) virtual, typ enum(%s), sport set('football','cricket','baseball'), ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), blb blob, primary key(%s), key(name)) CHARSET=utf8mb4` diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 7009bade562..35648356278 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -1090,3 +1090,147 @@ func TestVStreamHeartbeats(t *testing.T) { }) } } + +// TestVStreamPushdownFilters confirms that pushdown filters are applied correctly +// when they are specified in the VStream API via the rule.Filter. +// It also confirms that we use the proper collations for the vstream filter when +// using varchar fields. +func TestVStreamPushdownFilters(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + // Enable continuous heartbeats. + extraVTTabletArgs = append(extraVTTabletArgs, + "--heartbeat_enable", + "--heartbeat_interval", "1s", + "--heartbeat_on_demand_duration", "0", + ) + setSidecarDBName("_vt") + config := *mainClusterConfig + config.overrideHeartbeatOptions = true + vc = NewVitessCluster(t, &clusterOptions{ + clusterConfig: &config, + }) + defer vc.TearDown() + ks := "product" + shard := "0" + + require.NotNil(t, vc) + + defaultCell := vc.Cells[vc.CellNames[0]] + vc.AddKeyspace(t, []*Cell{defaultCell}, ks, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil) + verifyClusterHealth(t, vc) + insertInitialData(t) + + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t, vc) + + // Make sure that we get at least one paul event in the copy phase. + _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('PAUĹ')", ks), 1, false) + require.NoError(t, err) + + res, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select count(*) from %s.customer where name = 'pauĺ'", ks), 1, false) + require.NoError(t, err) + require.Len(t, res.Rows, 1) + startingPauls, err := res.Rows[0][0].ToInt() + require.NoError(t, err) + + // Coordinate go-routines. + streamCtx, streamCancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer streamCancel() + done := make(chan struct{}) + + // First goroutine that keeps inserting rows into the table being streamed until the + // stream context is cancelled. + createdPauls := startingPauls + createdNonPauls := 0 + go func() { + id := 1 + for { + select { + case <-streamCtx.Done(): + // Give the VStream a little catch-up time before telling it to stop + // via the done channel. + time.Sleep(10 * time.Second) + close(done) + return + default: + if id%10 == 0 { + _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('paÜl')", ks), 1, false) + require.NoError(t, err) + createdPauls++ + } else { + insertRow(ks, "customer", id) + createdNonPauls++ + } + time.Sleep(10 * time.Millisecond) + id++ + } + } + }() + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: shard, + Gtid: "", + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + // Stream all tables. + Match: "customer", + Filter: "select * from customer where name = 'påul'", + }}, + } + flags := &vtgatepb.VStreamFlags{} + vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + require.NoError(t, err) + defer vstreamConn.Close() + + // So we should have at least one paul event in the copy phase. + copyPhaseRowEvents := 0 + // And we should have many paul events in the running phase. + runningPhaseRowEvents := 0 + copyPhase := true + + func() { + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_COPY_COMPLETED: + copyPhase = false + case binlogdatapb.VEventType_ROW: + if copyPhase { + copyPhaseRowEvents++ + } else { + runningPhaseRowEvents++ + } + } + } + default: + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) + } + select { + case <-streamCtx.Done(): + return + default: + } + } + }() + + require.NotZero(t, createdPauls) + require.NotZero(t, createdNonPauls) + require.Greater(t, createdNonPauls, createdPauls) + require.NotZero(t, copyPhaseRowEvents) + require.NotZero(t, runningPhaseRowEvents) + + t.Logf("Created pauls: %d, pauls copied: %d, pauls replicated: %d", createdPauls, copyPhaseRowEvents, runningPhaseRowEvents) + require.Equal(t, createdPauls, copyPhaseRowEvents+runningPhaseRowEvents) +}