Skip to content

Commit

Permalink
Merge branch 'master' into feat.fetchRemoteSchemaAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Jan 10, 2025
2 parents 6521210 + f182738 commit dbca693
Show file tree
Hide file tree
Showing 13 changed files with 837 additions and 209 deletions.
347 changes: 274 additions & 73 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,92 +1024,293 @@ func requireSequential(t *testing.T, jobs []*JobT) {
}
}

func TestJobsDB_SanitizeJSON(t *testing.T) {
_ = startPostgres(t)
jobDB := Handle{config: config.New()}
func TestJobsdbSanitizeJSON(t *testing.T) {
type testCase struct {
in, out string
err error
}
ch := func(n int) string {
return strings.Repeat("�", n)
}
toValidUTF8Tests := []struct {
in string
out string
err error
byteaInvalidInputSyntaxError := errors.New("pq: invalid input syntax for type bytea")
UTF8Tests := []struct {
payloadColumnType string
cases []testCase
}{
{`\u0000`, "", nil},
{`\u0000☺\u0000b☺`, "☺b☺", nil},
// NOTE: we are not handling the following:
// {"\u0000", ""},
// {"\u0000☺\u0000b☺", "☺b☺"},

{"", "", nil},
{"abc", "abc", nil},
{"\uFDDD", "\uFDDD", nil},
{"a\xffb", "a" + ch(1) + "b", nil},
{"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", nil},
{"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), nil},
{"\xC0\xAF", ch(2), nil},
{"\xE0\x80\xAF", ch(3), nil},
{"\xed\xa0\x80", ch(3), nil},
{"\xed\xbf\xbf", ch(3), nil},
{"\xF0\x80\x80\xaf", ch(4), nil},
{"\xF8\x80\x80\x80\xAF", ch(5), nil},
{"\xFC\x80\x80\x80\x80\xAF", ch(6), nil},

// {"\ud800", ""},
{`\ud800`, ch(1), nil},
{`\uDEAD`, ch(1), nil},

{`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil},
{`\uD83D\ude04`, "😄", nil},

{`\u4e2d\u6587`, "中文", nil},
{`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil},

{`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)},
{`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)},
{
string(JSONB),
[]testCase{
{`\u0000`, "", nil},
{`\u0000☺\u0000b☺`, "☺b☺", nil},
// NOTE: we are not handling the following:
// {"\u0000", ""},
// {"\u0000☺\u0000b☺", "☺b☺"},

{"", "", nil},
{"abc", "abc", nil},
{"\uFDDD", "\uFDDD", nil},
{"a\xffb", "a" + ch(1) + "b", nil},
{"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", nil},
{"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), nil},
{"\xC0\xAF", ch(2), nil},
{"\xE0\x80\xAF", ch(3), nil},
{"\xed\xa0\x80", ch(3), nil},
{"\xed\xbf\xbf", ch(3), nil},
{"\xF0\x80\x80\xaf", ch(4), nil},
{"\xF8\x80\x80\x80\xAF", ch(5), nil},
{"\xFC\x80\x80\x80\x80\xAF", ch(6), nil},

// {"\ud800", ""},
{`\ud800`, ch(1), nil},
{`\uDEAD`, ch(1), nil},

{`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil},
{`\uD83D\ude04`, "😄", nil},

{`\u4e2d\u6587`, "中文", nil},
{`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil},

{`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)},
{`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)},
},
},
{
string(TEXT),
[]testCase{
{`\u0000`, `\u0000`, nil},
{`\u0000☺\u0000b☺`, `\u0000☺\u0000b☺`, nil},

{"", "", nil},
{"abc", "abc", nil},
{"\uFDDD", "\uFDDD", nil},
{"a\xffb", `a\ufffdb`, nil},
{"a\xffb\uFFFD", `a\ufffdb�`, nil},
{"a☺\xffb☺\xC0\xAFc☺\xff", `a☺\ufffdb☺\ufffd\ufffdc☺\ufffd`, nil},
{"\xC0\xAF", `\ufffd\ufffd`, nil},
{"\xE0\x80\xAF", `\ufffd\ufffd\ufffd`, nil},
{"\xed\xa0\x80", `\ufffd\ufffd\ufffd`, nil},
{"\xed\xbf\xbf", `\ufffd\ufffd\ufffd`, nil},
{"\xF0\x80\x80\xaf", `\ufffd\ufffd\ufffd\ufffd`, nil},
{"\xF8\x80\x80\x80\xAF", `\ufffd\ufffd\ufffd\ufffd\ufffd`, nil},
{"\xFC\x80\x80\x80\x80\xAF", `\ufffd\ufffd\ufffd\ufffd\ufffd\ufffd`, nil},

// {"\ud800", ""},
// 15
{`\ud800`, `\ud800`, nil},
{`\uDEAD`, `\uDEAD`, nil},

{`\uD83D\ub000`, `\uD83D\ub000`, nil},
{`\uD83D\ude04`, `\uD83D\ude04`, nil},

{`\u4e2d\u6587`, `\u4e2d\u6587`, nil},
{`\ud83d\udc4a`, `\ud83d\udc4a`, nil},

// 21
{`\U0001f64f`, `\U0001f64f`, nil},
{`\uD83D\u00`, `\uD83D\u00`, nil},
},
},
{
string(BYTEA),
[]testCase{
{`\u0000`, "", nil},
{`\u0000☺\u0000b☺`, "☺b☺", nil},
// NOTE: we are not handling the following:
// {"\u0000", ""},
// {"\u0000☺\u0000b☺", "☺b☺"},

{"", "", nil},
{"abc", "abc", nil},
{"\uFDDD", "\uFDDD", nil},
{"a\xffb", "a" + ch(1) + "b", byteaInvalidInputSyntaxError},
{"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", byteaInvalidInputSyntaxError},
{"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), byteaInvalidInputSyntaxError},
{"\xC0\xAF", ch(2), byteaInvalidInputSyntaxError},
{"\xE0\x80\xAF", ch(3), byteaInvalidInputSyntaxError},
{"\xed\xa0\x80", ch(3), byteaInvalidInputSyntaxError},
{"\xed\xbf\xbf", ch(3), byteaInvalidInputSyntaxError},
{"\xF0\x80\x80\xaf", ch(4), byteaInvalidInputSyntaxError},
{"\xF8\x80\x80\x80\xAF", ch(5), byteaInvalidInputSyntaxError},
{"\xFC\x80\x80\x80\x80\xAF", ch(6), byteaInvalidInputSyntaxError},

// {"\ud800", ""},
// 15
{`\ud800`, ch(1), nil},
{`\uDEAD`, ch(1), nil},

{`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil},
{`\uD83D\ude04`, "😄", nil},

{`\u4e2d\u6587`, "中文", nil},
{`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil},

{`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)},
{`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)},
},
},
}

err := jobDB.Setup(ReadWrite, false, strings.ToLower(rand.String(5)))
require.NoError(t, err)
defer jobDB.TearDown()
for _, tCase := range UTF8Tests {
t.Run(tCase.payloadColumnType, func(t *testing.T) {
_ = startPostgres(t)
conf := config.New()
conf.Set("JobsDB.payloadColumnType", tCase.payloadColumnType)
jobDB := Handle{config: conf}
err := jobDB.Setup(ReadWrite, true, tCase.payloadColumnType+"_"+strings.ToLower(rand.String(5)))
require.NoError(t, err, tCase.payloadColumnType)
eventPayload := []byte(`{"batch":[{"anonymousId":"anon_id","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`)
for i, tt := range tCase.cases {
customVal := fmt.Sprintf("TEST_%d", i)
jobs := []*JobT{{
Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`),
EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1),
UserID: uuid.New().String(),
UUID: uuid.New(),
CustomVal: customVal,
WorkspaceId: defaultWorkspaceID,
EventCount: 1,
}}
err := jobDB.Store(context.Background(), jobs)
if tt.err != nil {
require.Error(t, err, "should error", tCase.payloadColumnType, i)
require.Contains(t, err.Error(), tt.err.Error(), "should contain error", tCase.payloadColumnType, i)
continue
}

eventPayload := []byte(`{"batch": [{"anonymousId":"anon_id","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`)
for i, tt := range toValidUTF8Tests {
require.NoError(t, err, tCase.payloadColumnType, i)

unprocessedJob, err := jobDB.GetUnprocessed(context.Background(), GetQueryParams{
CustomValFilters: []string{customVal},
JobsLimit: 10,
ParameterFilters: []ParameterFilterT{},
})
require.NoError(t, err, "should not error")

require.Len(t, unprocessedJob.Jobs, 1, tCase.payloadColumnType, i)

if tCase.payloadColumnType == string(TEXT) { // some can't be valid json
require.Equal(t,
string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)),
string(unprocessedJob.Jobs[0].EventPayload),
tCase.payloadColumnType, i,
)
} else {
require.JSONEq(t,
string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)),
string(unprocessedJob.Jobs[0].EventPayload),
tCase.payloadColumnType, i,
)
}
}
jobDB.TearDown()
})
}
}

customVal := fmt.Sprintf("TEST_%d", i)
// Fuzzer represents a test fuzzer for analytics events
type Fuzzer struct {
testData []string
}

jobs := []*JobT{{
Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`),
EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1),
UserID: uuid.New().String(),
UUID: uuid.New(),
CustomVal: customVal,
WorkspaceId: defaultWorkspaceID,
EventCount: 1,
}}

err := jobDB.Store(context.Background(), jobs)
if tt.err != nil {
require.Error(t, err, "should error")
require.Contains(t, err.Error(), tt.err.Error(), "should contain error")
continue
}
// NewFuzzer creates a new Fuzzer instance
func NewFuzzer() *Fuzzer {
return &Fuzzer{
testData: make([]string, 0),
}
}

require.NoError(t, err)
// Add adds a test case to the fuzzer
func (f *Fuzzer) Add(data string) {
f.testData = append(f.testData, data)
}

unprocessedJob, err := jobDB.GetUnprocessed(context.Background(), GetQueryParams{
CustomValFilters: []string{customVal},
JobsLimit: 10,
ParameterFilters: []ParameterFilterT{},
})
require.NoError(t, err, "should not error")
// TestFuzzCorpus is the main test function that sets up the test corpus
func GenerateFuzzCorpus() []string {
f := NewFuzzer()

require.Len(t, unprocessedJob.Jobs, 1)
// Basic event types
f.Add(`{"type":"alias","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","previousId":"previousId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)
f.Add(`{"type":"alias","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","previousId":"previousId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","traits":{"title":"Home | RudderStack","url":"https://www.rudderstack.com"}}`)

require.JSONEq(t,
string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)),
string(unprocessedJob.Jobs[0].EventPayload),
)
// Add page views
f.Add(`{"type":"page","name":"Home","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Add screen views
f.Add(`{"type":"screen","name":"Main","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Add group events
f.Add(`{"type":"group","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","groupId":"groupId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Add track events
f.Add(`{"type":"track","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","event":"event","request_ip":"5.6.7.8","userProperties":{"rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Test column names with special characters and edge cases
columnNames := []string{
// SQL keywords
"select", "from", "where", "and", "or", "not", "insert", "update", "delete",
// Special characters
"column name", "column-name", "column.name", "column@name", "column#name",
// Unicode characters
"columnñame", "colûmnname", "columnнаме", "列名", "カラム名",
// Very long names
"this_is_a_very_long_column_name_that_exceeds_the_maximum_allowed_length",
}

for _, columnName := range columnNames {
f.Add(fmt.Sprintf(`{"type":"track","messageId":"messageId","userId":"userId","event":"test","properties":{"%s":"test_value"}}`, columnName))
}

// Test event names with special cases
eventNames := []string{
"omega", "omega v2 ", "9mega", "mega&", "ome$ga",
"select", "drop", "create", "alter", "index",
"name with spaces", "name@with@special@chars",
"序列化", "テーブル", "таблица",
}

for _, eventName := range eventNames {
f.Add(fmt.Sprintf(`{"type":"track","messageId":"messageId","userId":"userId","event":"%s","properties":{"test":"value"}}`, eventName))
}

// Add merge events
f.Add(`{"type":"merge","mergeProperties":[{"type":"email","value":"[email protected]"},{"type":"mobile","value":"+1-202-555-0146"}]}`)
f.Add(`{"type":"merge"}`)
f.Add(`{"type":"merge", "mergeProperties": "invalid"}`)

// Add identify events
f.Add(`{"type":"identify","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2}}`)

// Add extract events
f.Add(`{"type":"extract","recordId":"recordID","event":"event","receivedAt":"2021-09-01T00:00:00.000Z","context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)
return f.testData
}

func Test_FuzzTestStore(t *testing.T) {
_ = startPostgres(t)
conf := config.New()
columnTypes := []string{"jsonb", "text", "bytea"}
for _, column := range columnTypes {
t.Run(fmt.Sprintf("Store with %s column type", column), func(t *testing.T) {
conf.Set("JobsDB.payloadColumnType", column)
jobDB := Handle{config: conf}
err := jobDB.Setup(ReadWrite, true, column+"_"+strings.ToLower(rand.String(5)))
require.NoError(t, err)
testPayloads := GenerateFuzzCorpus()
for i, payload := range testPayloads {
jobs := []*JobT{{
Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`),
EventPayload: []byte(payload),
UserID: uuid.New().String(),
UUID: uuid.New(),
CustomVal: fmt.Sprintf("TEST_%d", i),
WorkspaceId: defaultWorkspaceID,
EventCount: 1,
}}
err := jobDB.Store(context.Background(), jobs)
require.NoError(t, err)
}
jobDB.TearDown()
})
}
}

Expand Down
Loading

0 comments on commit dbca693

Please sign in to comment.