Skip to content

Commit

Permalink
Adding wasm.
Browse files Browse the repository at this point in the history
Note: cannot name a file XXX_wasm.go.  Go build will treat this file as
a wasm target.  Lots of mystic failures ...

With this, we allow user to run WebAssembly, which essentially allow
user to run a UDF written in any language.   This indeed make the
dev flow a little bit longer/harder, but it is significantly easier
than write plsql and debug plsql.

Wasm function only takes string args.  One common way is to encode
the args into a json string.  json_encode, concat_ws, group_concat
is your friend.

Return is a string as well.  jq and unnest can be used to decode
json.
  • Loading branch information
fengttt committed Jul 8, 2024
1 parent 850b992 commit 1af2b50
Show file tree
Hide file tree
Showing 12 changed files with 368 additions and 77 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/docker/go-units v0.5.0
github.com/dolthub/maphash v0.1.0
github.com/elastic/gosigar v0.14.2
github.com/extism/go-sdk v1.3.0
github.com/fagongzi/goetty/v2 v2.0.3-0.20230628075727-26c9a2fd5fb8
github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d
github.com/felixge/fgprof v0.9.3
Expand Down Expand Up @@ -85,6 +86,7 @@ require (
github.com/containerd/cgroups/v3 v3.0.1 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/gosimple/slug v1.13.1 // indirect
github.com/gosimple/unidecode v1.0.1 // indirect
Expand All @@ -104,6 +106,7 @@ require (
github.com/rs/xid v1.5.0 // indirect
github.com/segmentio/encoding v0.3.6 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tetratelabs/wazero v1.7.3 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/extism/go-sdk v1.3.0 h1:DBd4FzDBUAL3P01MNqUD2+x8G7qyYdJ7pV96NIrfWXA=
github.com/extism/go-sdk v1.3.0/go.mod h1:tPMWfCSOThie3LSTSZKbrQjRm2oAXxUUjSE4HJWjYQM=
github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d h1:1pILVCatHj3eVo9i52dZyY4BwjTmSIeN+/hoJh8rD0Y=
github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d/go.mod h1:5cqSns2zMRcJeVGvAqeTrbXFqh5AqBFr5uVKP9T2kiE=
github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
Expand Down Expand Up @@ -266,6 +268,8 @@ github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5F
github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
Expand Down Expand Up @@ -722,6 +726,8 @@ github.com/testcontainers/testcontainers-go v0.29.1 h1:z8kxdFlovA2y97RWx98v/TQ+t
github.com/testcontainers/testcontainers-go v0.29.1/go.mod h1:SnKnKQav8UcgtKqjp/AD8bE1MqZm+3TDb/B8crE3XnI=
github.com/testcontainers/testcontainers-go/modules/compose v0.29.1 h1:47ipPM+s+ltCDOP3Sa1j95AkNb+z+WGiHLDbLU8ixuc=
github.com/testcontainers/testcontainers-go/modules/compose v0.29.1/go.mod h1:Sqh+Ef2ESdbJQjTJl57UOkEHkOc7gXvQLg1b5xh6f1Y=
github.com/tetratelabs/wazero v1.7.3 h1:PBH5KVahrt3S2AHgEjKu4u+LlDbbk+nsGE3KLucy6Rw=
github.com/tetratelabs/wazero v1.7.3/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y=
github.com/theupdateframework/notary v0.7.0 h1:QyagRZ7wlSpjT5N2qQAh/pN+DVqgekv4DzbAiAiEL3c=
github.com/theupdateframework/notary v0.7.0/go.mod h1:c9DRxcmhHmVLDay4/2fUYdISnHqbFDGRSlXPO0AhYWw=
github.com/tidwall/btree v1.6.0 h1:LDZfKfQIBHGHWSwckhXI0RPSXzlo+KYdjK7FWSqOzzg=
Expand Down
56 changes: 30 additions & 26 deletions pkg/frontend/authenticate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2348,38 +2348,42 @@ func Test_determineRevokePrivilege(t *testing.T) {
convey.Convey("revoke privilege [ObjectType: Table] AdminRole succ", t, func() {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
var stmts []*tree.RevokePrivilege
// var stmts []*tree.RevokePrivilege

for _, stmt := range stmts {
priv := determinePrivilegeSetOfStatement(stmt)
ses := newSes(priv, ctrl)
// XXX FIXME: Go compiler is correct -- this test is busted.
// we are looping over nil.
// for _, stmt := range stmts {
// priv := determinePrivilegeSetOfStatement(stmt)
// ses := newSes(priv, ctrl)

ok, err := authenticateUserCanExecuteStatementWithObjectTypeNone(ses.GetTxnHandler().GetTxnCtx(), ses, stmt)
convey.So(err, convey.ShouldBeNil)
convey.So(ok, convey.ShouldBeTrue)
}
// ok, err := authenticateUserCanExecuteStatementWithObjectTypeNone(ses.GetTxnHandler().GetTxnCtx(), ses, stmt)
// convey.So(err, convey.ShouldBeNil)
// convey.So(ok, convey.ShouldBeTrue)
// }
})
convey.Convey("revoke privilege [ObjectType: Table] not AdminRole fail", t, func() {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
var stmts []*tree.RevokePrivilege

for _, stmt := range stmts {
priv := determinePrivilegeSetOfStatement(stmt)
ses := newSes(priv, ctrl)
ses.tenant = &TenantInfo{
Tenant: "xxx",
User: "xxx",
DefaultRole: "xxx",
TenantID: 1001,
UserID: 1001,
DefaultRoleID: 1001,
}

ok, err := authenticateUserCanExecuteStatementWithObjectTypeNone(ses.GetTxnHandler().GetTxnCtx(), ses, stmt)
convey.So(err, convey.ShouldBeNil)
convey.So(ok, convey.ShouldBeFalse)
}
// var stmts []*tree.RevokePrivilege

// XXX FIXME: Go compiler is correct -- this test is busted.
// we are looping over nil.
// for _, stmt := range stmts {
// priv := determinePrivilegeSetOfStatement(stmt)
// ses := newSes(priv, ctrl)
// ses.tenant = &TenantInfo{
// Tenant: "xxx",
// User: "xxx",
// DefaultRole: "xxx",
// TenantID: 1001,
// UserID: 1001,
// DefaultRoleID: 1001,
// }

// ok, err := authenticateUserCanExecuteStatementWithObjectTypeNone(ses.GetTxnHandler().GetTxnCtx(), ses, stmt)
// convey.So(err, convey.ShouldBeNil)
// convey.So(ok, convey.ShouldBeFalse)
// }
})
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/frontend/plsql_interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ func (interpreter *Interpreter) GetSimpleExprValueWithSpVar(e tree.Expr) (interf
return nil, err
}
retExpr := retStmt.(*tree.Select).Select.(*tree.SelectClause).Exprs[0].Expr
if err != nil {
return nil, err
}
return GetSimpleExprValue(interpreter.ctx, retExpr, interpreter.ses)
}

Expand Down
36 changes: 18 additions & 18 deletions pkg/frontend/system_initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,15 @@ func checkSysExistsOrNotWithTxn(ctx context.Context, txn executor.TxnExecutor) (
// GenSQLForInsertUpgradeAccountPrivilege generates SQL statements for inserting upgrade account permissions
func GenSQLForInsertUpgradeAccountPrivilege() string {
entry := privilegeEntry{
privilegeId: PrivilegeTypeUpgradeAccount,
privilegeLevel: privilegeLevelStar,
objType: objectTypeAccount,
objId: objectIDAll,
withGrantOption: false,
databaseName: "",
tableName: "",
privilegeEntryTyp: privilegeEntryTypeGeneral,
compound: nil,
privilegeId: PrivilegeTypeUpgradeAccount,
privilegeLevel: privilegeLevelStar,
objType: objectTypeAccount,
objId: objectIDAll,
withGrantOption: false,
// databaseName: "",
// tableName: "",
// privilegeEntryTyp: privilegeEntryTypeGeneral,
// compound: nil,
}
return fmt.Sprintf(initMoRolePrivFormat,
moAdminRoleID, moAdminRoleName,
Expand All @@ -214,15 +214,15 @@ func GenSQLForInsertUpgradeAccountPrivilege() string {
// GenSQLForCheckUpgradeAccountPrivilegeExist generates an SQL statement to check for the existence of upgrade account permissions.
func GenSQLForCheckUpgradeAccountPrivilegeExist() string {
entry := privilegeEntry{
privilegeId: PrivilegeTypeUpgradeAccount,
privilegeLevel: privilegeLevelStar,
objType: objectTypeAccount,
objId: objectIDAll,
withGrantOption: false,
databaseName: "",
tableName: "",
privilegeEntryTyp: privilegeEntryTypeGeneral,
compound: nil,
privilegeId: PrivilegeTypeUpgradeAccount,
privilegeLevel: privilegeLevelStar,
objType: objectTypeAccount,
objId: objectIDAll,
// withGrantOption: false,
// databaseName: "",
// tableName: "",
// privilegeEntryTyp: privilegeEntryTypeGeneral,
// compound: nil,
}

sql := fmt.Sprintf("select * from mo_catalog.mo_role_privs where role_id = %d and obj_type = '%s' and obj_id = %d and privilege_id = %d and privilege_level = '%s'",
Expand Down
49 changes: 24 additions & 25 deletions pkg/sql/plan/function/func_builtin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,19 +331,19 @@ func Test_BuiltIn_Serial(t *testing.T) {
p1 := vector.GenerateFunctionStrParameter(vec)
{
v, null := p1.GetStrValue(0)
require.False(t, null)
require.False(t, null, tc.info)
tuple, err := types.Unpack(v)
require.NoError(t, err)
require.Equal(t, input1[0], tuple[0])
require.Equal(t, input2[0], tuple[1])
require.NoError(t, err, tc.info)
require.Equal(t, input1[0], tuple[0], tc.info)
require.Equal(t, input2[0], tuple[1], tc.info)
}
{
v, null := p1.GetStrValue(1)
require.False(t, null)
require.False(t, null, tc.info)
tuple, err := types.Unpack(v)
require.NoError(t, err)
require.Equal(t, input1[1], tuple[0])
require.Equal(t, input2[1], tuple[1])
require.NoError(t, err, tc.info)
require.Equal(t, input1[1], tuple[0], tc.info)
require.Equal(t, input2[1], tuple[1], tc.info)
}
}

Expand Down Expand Up @@ -372,37 +372,36 @@ func Test_BuiltIn_SerialFull(t *testing.T) {
p1 := vector.GenerateFunctionStrParameter(vec)
{
v, null := p1.GetStrValue(0)
require.False(t, null)
require.False(t, null, tc.info)
tuple, err := types.Unpack(v)
require.NoError(t, err)
require.Equal(t, nil, tuple[0]) // note: nulls are preserved
require.Equal(t, input2[0], tuple[1])
require.NoError(t, err, tc.info)
require.Equal(t, nil, tuple[0], tc.info) // note: nulls are preserved
require.Equal(t, input2[0], tuple[1], tc.info)
}
{
v, null := p1.GetStrValue(1)
require.False(t, null)
require.False(t, null, tc.info)
tuple, err := types.Unpack(v)
require.NoError(t, err)
require.Equal(t, input1[1], tuple[0])
require.Equal(t, nil, tuple[1]) // note: nulls are preserved
require.NoError(t, err, tc.info)
require.Equal(t, input1[1], tuple[0], tc.info)
require.Equal(t, nil, tuple[1], tc.info) // note: nulls are preserved
}
{
v, null := p1.GetStrValue(2)
require.False(t, null)
require.False(t, null, tc.info)
tuple, err := types.Unpack(v)
require.NoError(t, err)
require.Equal(t, nil, tuple[0]) // note: nulls are preserved
require.Equal(t, input2[2], tuple[1])
require.NoError(t, err, tc.info)
require.Equal(t, nil, tuple[0], tc.info) // note: nulls are preserved
require.Equal(t, input2[2], tuple[1], tc.info)
}
{
v, null := p1.GetStrValue(3)
require.False(t, null)
require.False(t, null, tc.info)
tuple, err := types.Unpack(v)
require.NoError(t, err)
require.Equal(t, nil, tuple[0]) // note: nulls are preserved
require.Equal(t, nil, tuple[1]) // note: nulls are preserved
require.NoError(t, err, tc.info)
require.Equal(t, nil, tuple[0], tc.info) // note: nulls are preserved
require.Equal(t, nil, tuple[1], tc.info) // note: nulls are preserved
}

}

func initSerialExtractTestCase() []tcTemp {
Expand Down
130 changes: 130 additions & 0 deletions pkg/sql/plan/function/func_builtin_w.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package function

import (
"context"

extism "github.com/extism/go-sdk"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

// see https://extism.org for golang,
// see https://github.com/extism/go-sdk and https://github.com/extism/go-pdk
//
// wasm(wasmurl, fn, arg) runs the wasm (as extism plugin), call function fn with
// the given arg. The wasmurl must be a url to a wasm file that can be accessed by CN.
// The fn must be a valid function name in the plugin. Arg is passed in as string and
// the result is also returned as string. For other types (and multiple args),
// user must econde the args into string -- usually using json.
//
// try_wasm is the same as startlark, but it will error if there is error
// when running wasm. Instead, it will just return NULL.

type opBuiltInWasm struct {
// do we need to call plugin.Close()?
plugin *extism.Plugin
}

func newOpBuiltInWasm() *opBuiltInWasm {
return &opBuiltInWasm{}
}

func (op *opBuiltInWasm) buildWasm(ctx context.Context, url string) error {
var err error

// manifest is created from wasm url.
// We will need to handle stage in the future.
manifest := extism.Manifest{
Wasm: []extism.Wasm{
extism.WasmUrl{
Url: url,
},
},
}
// enable wasi: tinygo build wasm need wasi.
config := extism.PluginConfig{
EnableWasi: true,
}
op.plugin, err = extism.NewPlugin(ctx, manifest, config, []extism.HostFunction{})
return err
}

func (op *opBuiltInWasm) runWasm(fn string, arg []byte) ([]byte, error) {
_, out, err := op.plugin.Call(fn, arg)
return out, err
}

func (op *opBuiltInWasm) wasm(params []*vector.Vector, result vector.FunctionResultWrapper,
proc *process.Process, length int, selectList *FunctionSelectList) error {
return op.tryWasmImpl(params, result, proc, length, selectList, false)
}

func (op *opBuiltInWasm) tryWasm(params []*vector.Vector, result vector.FunctionResultWrapper,
proc *process.Process, length int, selectList *FunctionSelectList) error {
return op.tryWasmImpl(params, result, proc, length, selectList, true)
}

func (op *opBuiltInWasm) tryWasmImpl(params []*vector.Vector, result vector.FunctionResultWrapper,
proc *process.Process, length int, selectList *FunctionSelectList, isTry bool) error {
p1 := vector.GenerateFunctionStrParameter(params[0])
if !params[0].IsConst() {
return moerr.NewInvalidInput(proc.Ctx, "wasm url must be constant.")
}
url, isnull := p1.GetStrValue(0)
if isnull {
return moerr.NewInvalidInput(proc.Ctx, "wasm url cannot be null.")
}
if err := op.buildWasm(proc.Ctx, string(url)); err != nil {
return err
}

rs := vector.MustFunctionResult[types.Varlena](result)
p2 := vector.GenerateFunctionStrParameter(params[1])
p3 := vector.GenerateFunctionStrParameter(params[2])

if selectList.IgnoreAllRow() {
rs.AddNullRange(0, uint64(length))
return nil
}

for i := uint64(0); i < uint64(length); i++ {
fn, isnull := p2.GetStrValue(i)
if isnull {
rs.AppendBytes(nil, true)
continue
}
arg, isnull := p3.GetStrValue(i)
if isnull {
rs.AppendBytes(nil, true)
continue
}

res, err := op.runWasm(string(fn), arg)
if err != nil {
if isTry {
rs.AppendBytes(nil, true)
} else {
return err
}
} else {
rs.AppendBytes(res, false)
}
}
return nil
}
Loading

0 comments on commit 1af2b50

Please sign in to comment.