Skip to content

Commit

Permalink
Add e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Feb 1, 2025
1 parent 3d308b3 commit e4374be
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
var (
// All standard user tables should have a primary key and at least one secondary key.
customerTypes = []string{"'individual'", "'soho'", "'enterprise'"}
customerTableTemplate = `create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null,
customerTableTemplate = `create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_general_ci, meta json default null,
industryCategory varchar(100) generated always as (json_extract(meta, _utf8mb4'$.industry')) virtual, typ enum(%s),
sport set('football','cricket','baseball'), ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), blb blob, primary key(%s), key(name)) CHARSET=utf8mb4`
Expand Down
144 changes: 144 additions & 0 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,3 +1090,147 @@ func TestVStreamHeartbeats(t *testing.T) {
})
}
}

// TestVStreamPushdownFilters confirms that pushdown filters are applied correctly
// when they are specified in the VStream API via the rule.Filter.
// It also confirms that we use the proper collations for the vstream filter when
// using varchar fields.
func TestVStreamPushdownFilters(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Enable continuous heartbeats.
extraVTTabletArgs = append(extraVTTabletArgs,
"--heartbeat_enable",
"--heartbeat_interval", "1s",
"--heartbeat_on_demand_duration", "0",
)
setSidecarDBName("_vt")
config := *mainClusterConfig
config.overrideHeartbeatOptions = true
vc = NewVitessCluster(t, &clusterOptions{
clusterConfig: &config,
})
defer vc.TearDown()
ks := "product"
shard := "0"

require.NotNil(t, vc)

defaultCell := vc.Cells[vc.CellNames[0]]
vc.AddKeyspace(t, []*Cell{defaultCell}, ks, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
verifyClusterHealth(t, vc)
insertInitialData(t)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

// Make sure that we get at least one paul event in the copy phase.
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('PAUĹ')", ks), 1, false)
require.NoError(t, err)

res, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select count(*) from %s.customer where name = 'pauĺ'", ks), 1, false)
require.NoError(t, err)
require.Len(t, res.Rows, 1)
startingPauls, err := res.Rows[0][0].ToInt()
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer streamCancel()
done := make(chan struct{})

// First goroutine that keeps inserting rows into the table being streamed until the
// stream context is cancelled.
createdPauls := startingPauls
createdNonPauls := 0
go func() {
id := 1
for {
select {
case <-streamCtx.Done():
// Give the VStream a little catch-up time before telling it to stop
// via the done channel.
time.Sleep(10 * time.Second)
close(done)
return
default:
if id%10 == 0 {
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('paÜl')", ks), 1, false)
require.NoError(t, err)
createdPauls++
} else {
insertRow(ks, "customer", id)
createdNonPauls++
}
time.Sleep(10 * time.Millisecond)
id++
}
}
}()

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
Gtid: "",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
// Stream all tables.
Match: "customer",
Filter: "select * from customer where name = 'påul'",
}},
}
flags := &vtgatepb.VStreamFlags{}
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

// So we should have at least one paul event in the copy phase.
copyPhaseRowEvents := 0
// And we should have many paul events in the running phase.
runningPhaseRowEvents := 0
copyPhase := true

func() {
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_COPY_COMPLETED:
copyPhase = false
case binlogdatapb.VEventType_ROW:
if copyPhase {
copyPhaseRowEvents++
} else {
runningPhaseRowEvents++
}
}
}
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
}
select {
case <-streamCtx.Done():
return
default:
}
}
}()

require.NotZero(t, createdPauls)
require.NotZero(t, createdNonPauls)
require.Greater(t, createdNonPauls, createdPauls)
require.NotZero(t, copyPhaseRowEvents)
require.NotZero(t, runningPhaseRowEvents)

t.Logf("Created pauls: %d, pauls copied: %d, pauls replicated: %d", createdPauls, copyPhaseRowEvents, runningPhaseRowEvents)
require.Equal(t, createdPauls, copyPhaseRowEvents+runningPhaseRowEvents)
}

0 comments on commit e4374be

Please sign in to comment.