diff --git a/go/vt/vtgate/engine/delete_test.go b/go/vt/vtgate/engine/delete_test.go index 18dcef5cbe4..56d3467aac3 100644 --- a/go/vt/vtgate/engine/delete_test.go +++ b/go/vt/vtgate/engine/delete_test.go @@ -45,7 +45,7 @@ func TestDeleteUnsharded(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -80,7 +80,7 @@ func TestDeleteEqual(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -112,7 +112,7 @@ func TestDeleteEqualMultiCol(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -148,7 +148,7 @@ func TestDeleteEqualNoRoute(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -181,7 +181,7 @@ func TestDeleteEqualNoScatter(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.EqualError(t, err, "cannot map vindex to unique keyspace id: DestinationKeyRange(-)") } @@ -213,7 +213,7 @@ func TestDeleteOwnedVindex(t *testing.T) { "1|4|5|6", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -231,7 +231,7 @@ func TestDeleteOwnedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -252,7 +252,7 @@ func TestDeleteOwnedVindex(t *testing.T) { "1|4|5|6", "1|7|8|9", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -300,7 +300,7 @@ func TestDeleteOwnedVindexMultiCol(t *testing.T) { "1|2|4", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -371,7 +371,7 @@ func TestDeleteSharded(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -399,7 +399,7 @@ func TestDeleteShardedStreaming(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") err := del.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error { return nil }) @@ -435,7 +435,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) { "1|4|5|6", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -453,7 +453,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -475,7 +475,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) { "1|4|5|6", "1|7|8|9", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -528,7 +528,7 @@ func TestDeleteInChangedVindexMultiCol(t *testing.T) { "1|3|6", "2|3|7", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -565,7 +565,7 @@ func TestDeleteEqualSubshard(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -602,7 +602,7 @@ func TestDeleteMultiEqual(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -635,7 +635,7 @@ func TestDeleteInUnique(t *testing.T) { Type: querypb.Type_TUPLE, Values: append([]*querypb.Value{sqltypes.ValueToProto(sqltypes.NewInt64(1))}, sqltypes.ValueToProto(sqltypes.NewInt64(2)), sqltypes.ValueToProto(sqltypes.NewInt64(4))), } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{"__vals": tupleBV}, false) require.NoError(t, err) diff --git a/go/vt/vtgate/engine/dml_with_input_test.go b/go/vt/vtgate/engine/dml_with_input_test.go index 6fcf2040dfc..38d9068b433 100644 --- a/go/vt/vtgate/engine/dml_with_input_test.go +++ b/go/vt/vtgate/engine/dml_with_input_test.go @@ -51,7 +51,7 @@ func TestDeleteWithInputSingleOffset(t *testing.T) { OutputCols: [][]int{{0}}, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -95,7 +95,7 @@ func TestDeleteWithInputMultiOffset(t *testing.T) { OutputCols: [][]int{{1, 0}}, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -160,7 +160,7 @@ func TestDeleteWithMultiTarget(t *testing.T) { OutputCols: [][]int{{0}, {1, 2}}, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -210,7 +210,7 @@ func TestUpdateWithInputNonLiteral(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = []*sqltypes.Result{ {RowsAffected: 1}, {RowsAffected: 1}, {RowsAffected: 1}, } diff --git a/go/vt/vtgate/engine/fk_cascade_test.go b/go/vt/vtgate/engine/fk_cascade_test.go index 942fe44a709..c93e487067b 100644 --- a/go/vt/vtgate/engine/fk_cascade_test.go +++ b/go/vt/vtgate/engine/fk_cascade_test.go @@ -62,7 +62,7 @@ func TestDeleteCascade(t *testing.T) { Parent: parentP, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) @@ -123,7 +123,7 @@ func TestUpdateCascade(t *testing.T) { Parent: parentP, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) @@ -195,7 +195,7 @@ func TestNonLiteralUpdateCascade(t *testing.T) { Parent: parentP, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) diff --git a/go/vt/vtgate/engine/fk_verify_test.go b/go/vt/vtgate/engine/fk_verify_test.go index 5c9ff83c2ec..465dd81d3b2 100644 --- a/go/vt/vtgate/engine/fk_verify_test.go +++ b/go/vt/vtgate/engine/fk_verify_test.go @@ -58,7 +58,7 @@ func TestFKVerifyUpdate(t *testing.T) { t.Run("foreign key verification success", func(t *testing.T) { fakeRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64")) - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) @@ -83,7 +83,7 @@ func TestFKVerifyUpdate(t *testing.T) { t.Run("parent foreign key verification failure", func(t *testing.T) { // No results from select, should cause the foreign key verification to fail. fakeRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1", "1", "1") - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.ErrorContains(t, err, "Cannot add or update a child row: a foreign key constraint fails") @@ -105,7 +105,7 @@ func TestFKVerifyUpdate(t *testing.T) { t.Run("child foreign key verification failure", func(t *testing.T) { // No results from select, should cause the foreign key verification to fail. fakeRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1", "1", "1") - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.ErrorContains(t, err, "Cannot delete or update a parent row: a foreign key constraint fails") diff --git a/go/vt/vtgate/engine/insert_test.go b/go/vt/vtgate/engine/insert_test.go index 2de95e5d186..5e66649f82e 100644 --- a/go/vt/vtgate/engine/insert_test.go +++ b/go/vt/vtgate/engine/insert_test.go @@ -42,7 +42,7 @@ func TestInsertUnsharded(t *testing.T) { "dummy_insert", ) - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{{ InsertID: 4, }} @@ -91,7 +91,7 @@ func TestInsertUnshardedGenerate(t *testing.T) { ), } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -144,7 +144,7 @@ func TestInsertUnshardedGenerate_Zeros(t *testing.T) { ), } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -215,7 +215,7 @@ func TestInsertShardedSimple(t *testing.T) { }, nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -254,7 +254,7 @@ func TestInsertShardedSimple(t *testing.T) { }, nil, ) - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -297,7 +297,7 @@ func TestInsertShardedSimple(t *testing.T) { ) ins.MultiShardAutocommit = true - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -366,7 +366,7 @@ func TestInsertShardWithONDuplicateKey(t *testing.T) { }, }}}, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{ @@ -412,7 +412,7 @@ func TestInsertShardWithONDuplicateKey(t *testing.T) { &sqlparser.UpdateExpr{Name: sqlparser.NewColName("suffix"), Expr: &sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}}, }, ) - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -457,7 +457,7 @@ func TestInsertShardWithONDuplicateKey(t *testing.T) { ) ins.MultiShardAutocommit = true - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -590,7 +590,7 @@ func TestInsertShardedGenerate(t *testing.T) { ), } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( @@ -715,7 +715,7 @@ func TestInsertShardedOwned(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -807,7 +807,7 @@ func TestInsertShardedOwnedWithNull(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -893,7 +893,7 @@ func TestInsertShardedGeo(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -1029,7 +1029,7 @@ func TestInsertShardedIgnoreOwned(t *testing.T) { "\x00", ) noresult := &sqltypes.Result{} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20"} vc.results = []*sqltypes.Result{ // primary vindex lookups: fail row 2. @@ -1147,7 +1147,7 @@ func TestInsertShardedIgnoreOwnedWithNull(t *testing.T) { ), "\x00", ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} vc.results = []*sqltypes.Result{ ksid0, @@ -1267,7 +1267,7 @@ func TestInsertShardedUnownedVerify(t *testing.T) { "1", ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ nonemptyResult, @@ -1381,7 +1381,7 @@ func TestInsertShardedIgnoreUnownedVerify(t *testing.T) { "1", ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20"} vc.results = []*sqltypes.Result{ nonemptyResult, @@ -1472,7 +1472,7 @@ func TestInsertShardedIgnoreUnownedVerifyFail(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.EqualError(t, err, `values [[INT64(2)]] for column [c3] does not map to keyspace ids`) @@ -1578,7 +1578,7 @@ func TestInsertShardedUnownedReverseMap(t *testing.T) { "1", ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ nonemptyResult, @@ -1663,7 +1663,7 @@ func TestInsertShardedUnownedReverseMapSuccess(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -1694,7 +1694,7 @@ func TestInsertSelectSimple(t *testing.T) { Keyspace: ks.Keyspace}} ins := newInsertSelect(false, ks.Keyspace, ks.Tables["t1"], "prefix ", nil, [][]int{{1}}, rb) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( @@ -1787,7 +1787,7 @@ func TestInsertSelectOwned(t *testing.T) { rb, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( @@ -1894,7 +1894,7 @@ func TestInsertSelectGenerate(t *testing.T) { Offset: 1, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ // This is the result from the input SELECT @@ -1987,7 +1987,7 @@ func TestStreamingInsertSelectGenerate(t *testing.T) { Offset: 1, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ // This is the result from the input SELECT @@ -2082,7 +2082,7 @@ func TestInsertSelectGenerateNotProvided(t *testing.T) { Offset: 2, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ // This is the result from the input SELECT @@ -2169,7 +2169,7 @@ func TestStreamingInsertSelectGenerateNotProvided(t *testing.T) { Offset: 2, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ // This is the result from the input SELECT @@ -2258,7 +2258,7 @@ func TestInsertSelectUnowned(t *testing.T) { rb, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult(sqltypes.MakeTestFields("id", "int64"), "1", "3", "2"), diff --git a/go/vt/vtgate/engine/routing.go b/go/vt/vtgate/engine/routing.go index 067278c1a93..47d4cea04a6 100644 --- a/go/vt/vtgate/engine/routing.go +++ b/go/vt/vtgate/engine/routing.go @@ -21,6 +21,9 @@ import ( "encoding/json" "strconv" + "golang.org/x/exp/maps" + + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" @@ -74,6 +77,8 @@ const ( // Is used when the query explicitly sets a target destination: // in the clause e.g: UPDATE `keyspace[-]`.x1 SET foo=1 ByDestination + // Values // TODO + Values ) var opName = map[Opcode]string{ @@ -90,6 +95,7 @@ var opName = map[Opcode]string{ None: "None", ByDestination: "ByDestination", SubShard: "SubShard", + Values: "Values", } // MarshalJSON serializes the Opcode as a JSON string. @@ -176,6 +182,76 @@ func (rp *RoutingParameters) findRoute(ctx context.Context, vcursor VCursor, bin default: return rp.multiEqual(ctx, vcursor, bindVars) } + case Values: + switch rp.Vindex.(type) { + case vindexes.MultiColumn: + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported multi column vindex for values") + default: + if len(rp.Values) < 2 { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "values slice must at least be of length two for a values") + } + env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor) + value, err := env.Evaluate(rp.Values[0]) + if err != nil { + return nil, nil, err + } + + rval, ok := rp.Values[0].(*evalengine.BindVariable) + if !ok { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "cannot transform evalengine expr to bind variable for values") + } + + tuple := value.TupleValues() + + type rssValue struct { + rss *srvtopo.ResolvedShard + vals []sqltypes.Value + } + r := map[string]rssValue{} + for _, row := range tuple { + env.Row = nil + err = row.ForEachValue(func(bv sqltypes.Value) { + env.Row = append(env.Row, bv) + }) + if err != nil { + return nil, nil, err + } + val, err := env.Evaluate(rp.Values[1]) + if err != nil { + return nil, nil, err + } + + rss, _, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, []sqltypes.Value{val.Value(vcursor.ConnCollation())}) + if err != nil { + return nil, nil, err + } + if len(rss) > 1 { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "andres is confused") + } + r[rss[0].Target.String()] = rssValue{ + rss: rss[0], + vals: append(r[rss[0].Target.String()].vals, val.Value(collations.Unknown)), + } + } + var resultRss []*srvtopo.ResolvedShard + var resultBvs []map[string]*querypb.BindVariable + for _, rssVals := range r { + resultRss = append(resultRss, rssVals.rss) + + clonedBindVars := maps.Clone(bindVars) + + newBv := &querypb.BindVariable{ + Type: querypb.Type_TUPLE, + } + for _, s := range rssVals.vals { + newBv.Values = append(newBv.Values, sqltypes.ValueToProto(s)) + } + + clonedBindVars[rval.Key] = newBv + resultBvs = append(resultBvs, clonedBindVars) + } + return resultRss, resultBvs, nil + } default: // Unreachable. return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported opcode: %v", rp.Opcode) @@ -480,7 +556,13 @@ func setReplaceSchemaName(bindVars map[string]*querypb.BindVariable) { bindVars[sqltypes.BvReplaceSchemaName] = sqltypes.Int64BindVariable(1) } -func resolveShards(ctx context.Context, vcursor VCursor, vindex vindexes.SingleColumn, keyspace *vindexes.Keyspace, vindexKeys []sqltypes.Value) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error) { +func resolveShards( + ctx context.Context, + vcursor VCursor, + vindex vindexes.SingleColumn, + keyspace *vindexes.Keyspace, + vindexKeys []sqltypes.Value, +) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error) { // Convert vindexKeys to []*querypb.Value ids := make([]*querypb.Value, len(vindexKeys)) for i, vik := range vindexKeys { diff --git a/go/vt/vtgate/engine/routing_parameter_test.go b/go/vt/vtgate/engine/routing_parameter_test.go new file mode 100644 index 00000000000..17f6b9d4eca --- /dev/null +++ b/go/vt/vtgate/engine/routing_parameter_test.go @@ -0,0 +1,65 @@ +package engine + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +func TestFindRouteValuesJoin(t *testing.T) { + vindex, err := vindexes.CreateVindex("hash", "", nil) + require.NoError(t, err) + + const valueBvName = "v" + rp := &RoutingParameters{ + Opcode: Values, + + Keyspace: &vindexes.Keyspace{ + Name: "ks", + Sharded: true, + }, + + Vindex: vindex, + + Values: []evalengine.Expr{ + evalengine.NewBindVar(valueBvName, evalengine.NewType(sqltypes.Tuple, collations.Unknown)), + evalengine.NewColumn(0, evalengine.NewType(sqltypes.Int64, collations.Unknown), nil), + }, + } + + bv := &querypb.BindVariable{ + Type: querypb.Type_TUPLE, + } + bv.Values = append( + bv.Values, + sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewVarBinary("hello")}), + ) + bv.Values = append( + bv.Values, + sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(2), sqltypes.NewVarBinary("good morning")}), + ) + + vc := newTestVCursor("0") + rss, bvs, err := rp.findRoute(context.Background(), vc, map[string]*querypb.BindVariable{ + valueBvName: bv, + }) + require.NoError(t, err) + require.Len(t, rss, 1) + require.Len(t, bvs, 1) + var s []int64 + for _, value := range bvs[0][valueBvName].Values { + v := sqltypes.ProtoToValue(value) + require.Equal(t, sqltypes.Int64, v.Type()) + i, err := v.ToInt64() + require.NoError(t, err) + s = append(s, i) + } + require.Equal(t, []int64{1, 2}, s) +} diff --git a/go/vt/vtgate/engine/update_test.go b/go/vt/vtgate/engine/update_test.go index eb6af5a5299..e29ffeccd6f 100644 --- a/go/vt/vtgate/engine/update_test.go +++ b/go/vt/vtgate/engine/update_test.go @@ -50,7 +50,7 @@ func TestUpdateUnsharded(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -85,7 +85,7 @@ func TestUpdateEqual(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -116,7 +116,7 @@ func TestUpdateEqualMultiCol(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -142,7 +142,7 @@ func TestUpdateScatter(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -168,7 +168,7 @@ func TestUpdateScatter(t *testing.T) { }, } - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -199,7 +199,7 @@ func TestUpdateEqualNoRoute(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -250,7 +250,7 @@ func TestUpdateEqualChangedVindex(t *testing.T) { ), "1|4|5|6|0|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -272,7 +272,7 @@ func TestUpdateEqualChangedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -294,7 +294,7 @@ func TestUpdateEqualChangedVindex(t *testing.T) { "1|4|5|6|0|0", "1|7|8|9|0|0", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -330,7 +330,7 @@ func TestUpdateEqualChangedVindex(t *testing.T) { "1|4|5|6|0|1", // twocol changes "1|7|8|9|1|0", // onecol changes )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -387,7 +387,7 @@ func TestUpdateEqualMultiColChangedVindex(t *testing.T) { ), "1|2|4|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -514,7 +514,7 @@ func TestUpdateScatterChangedVindex(t *testing.T) { ), "1|4|5|6|0|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -534,7 +534,7 @@ func TestUpdateScatterChangedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) if err != nil { @@ -558,7 +558,7 @@ func TestUpdateScatterChangedVindex(t *testing.T) { "1|4|5|6|0|0", "1|7|8|9|0|0", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -604,7 +604,7 @@ func TestUpdateIn(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -628,7 +628,7 @@ func TestUpdateInStreamExecute(t *testing.T) { Query: "dummy_update", }} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") err := upd.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error { return nil }) @@ -655,7 +655,7 @@ func TestUpdateInMultiCol(t *testing.T) { Query: "dummy_update", }} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -710,7 +710,7 @@ func TestUpdateInChangedVindex(t *testing.T) { "1|4|5|6|0|0", "2|21|22|23|0|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -738,7 +738,7 @@ func TestUpdateInChangedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -761,7 +761,7 @@ func TestUpdateInChangedVindex(t *testing.T) { "1|7|8|9|0|0", "2|21|22|23|0|0", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -835,7 +835,7 @@ func TestUpdateInChangedVindexMultiCol(t *testing.T) { "1|3|6|0", "2|3|7|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -874,7 +874,7 @@ func TestUpdateEqualSubshard(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -911,7 +911,7 @@ func TestUpdateMultiEqual(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -944,7 +944,7 @@ func TestUpdateInUnique(t *testing.T) { Type: querypb.Type_TUPLE, Values: append([]*querypb.Value{sqltypes.ValueToProto(sqltypes.NewInt64(1))}, sqltypes.ValueToProto(sqltypes.NewInt64(2)), sqltypes.ValueToProto(sqltypes.NewInt64(4))), } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{"__vals": tupleBV}, false) require.NoError(t, err) @@ -1033,6 +1033,6 @@ func buildTestVSchema() *vindexes.VSchema { return vs } -func newDMLTestVCursor(shards ...string) *loggingVCursor { +func newTestVCursor(shards ...string) *loggingVCursor { return &loggingVCursor{shards: shards, resolvedTargetTabletType: topodatapb.TabletType_PRIMARY} } diff --git a/go/vt/vtgate/evalengine/eval.go b/go/vt/vtgate/evalengine/eval.go index 916c5e200f4..f75ac0f8202 100644 --- a/go/vt/vtgate/evalengine/eval.go +++ b/go/vt/vtgate/evalengine/eval.go @@ -378,6 +378,16 @@ func valueToEval(value sqltypes.Value, collation collations.TypedCollation, valu } switch tt := value.Type(); { + case tt == sqltypes.Tuple: + t := &evalTuple{} + err := value.ForEachValue(func(bv sqltypes.Value) { + e, err := valueToEval(bv, collation, values) + if err != nil { + return + } + t.t = append(t.t, e) + }) + return t, wrap(err) case sqltypes.IsSigned(tt): ival, err := value.ToInt64() return newEvalInt64(ival), wrap(err) diff --git a/go/vt/vtgate/evalengine/eval_tuple.go b/go/vt/vtgate/evalengine/eval_tuple.go index 81fa3317977..1faff68e155 100644 --- a/go/vt/vtgate/evalengine/eval_tuple.go +++ b/go/vt/vtgate/evalengine/eval_tuple.go @@ -27,7 +27,15 @@ type evalTuple struct { var _ eval = (*evalTuple)(nil) func (e *evalTuple) ToRawBytes() []byte { - return nil + var vals []sqltypes.Value + for _, e2 := range e.t { + v, err := sqltypes.NewValue(e2.SQLType(), e2.ToRawBytes()) + if err != nil { + panic(err) + } + vals = append(vals, v) + } + return sqltypes.TupleToProto(vals).Value } func (e *evalTuple) SQLType() sqltypes.Type {