diff --git a/README.md b/README.md index c87afa5..8ec602c 100644 --- a/README.md +++ b/README.md @@ -17,14 +17,12 @@ Project status: 1. Develop "[Cache Configuration](https://apacheignite.readme.io/docs/binary-client-protocol-cache-configuration-operations)" methods (Completed) 1. Develop "[Key-Value Queries](https://apacheignite.readme.io/docs/binary-client-protocol-key-value-operations)" methods (Completed*) -1. Develop "[SQL and Scan Queries](https://apacheignite.readme.io/docs/binary-client-protocol-sql-operations)" methods (Completed**) +1. Develop "[SQL and Scan Queries](https://apacheignite.readme.io/docs/binary-client-protocol-sql-operations)" methods (Completed) 1. Develop SQL driver (Completed) 1. Develop "[Binary Types](https://apacheignite.readme.io/docs/binary-client-protocol-binary-type-operations)" methods (Not started) *Not all types are supported. See **[type mapping](#type-mapping)** for detail. -**Not all operations are supported. See **[table](#sql-and-scan-queries-supported-operations)** for detail. - ### How to install ```shell @@ -260,8 +258,8 @@ log.Printf("key=\"%s\", value=%t", "field3", v) | OP_QUERY_SQL_CURSOR_GET_PAGE | Done. | | OP_QUERY_SQL_FIELDS | Done. | | OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE | Done. | -| OP_QUERY_SCAN | Not started. Need help. | -| OP_QUERY_SCAN_CURSOR_GET_PAGE | Not started. Need help. | +| OP_QUERY_SCAN | Done (without filter object support). | +| OP_QUERY_SCAN_CURSOR_GET_PAGE | Done (without filter object support). | | OP_RESOURCE_CLOSE | Done. | ### Error handling diff --git a/binary/v1/client-sql-and-scan-queries.go b/binary/v1/client-sql-and-scan-queries.go index 2d8c119..131bc8d 100644 --- a/binary/v1/client-sql-and-scan-queries.go +++ b/binary/v1/client-sql-and-scan-queries.go @@ -135,6 +135,37 @@ type QuerySQLFieldsResult struct { QuerySQLFieldsPage } +// QueryScanData input parameter for QueryScan func +type QueryScanData struct { + // Cursor page size. + PageSize int + + // Number of partitions to query (negative to query entire cache). + Partitions int + + // Local flag - whether this query should be executed on local node only. + LocalQuery bool +} + +// QueryScanPage is query result page +type QueryScanPage struct { + // Key -> Values + Rows map[interface{}]interface{} + + // Indicates whether more results are available to be fetched with QueryScanCursorGetPage. + // When true, query cursor is closed automatically. + HasMore bool +} + +// QueryScanResult output from QueryScan func +type QueryScanResult struct { + // Cursor id. Can be closed with ResourceClose func. + ID int64 + + // Query result first page + QueryScanPage +} + func (c *client) QuerySQL(cache string, binary bool, data QuerySQLData) (QuerySQLResult, error) { // request and response req := NewRequestOperation(OpQuerySQL) @@ -462,6 +493,116 @@ func (c *client) QuerySQLFieldsCursorGetPage(id int64, fieldCount int) (QuerySQL return r, nil } +func (c *client) QueryScan(cache string, binary bool, data QueryScanData) (QueryScanResult, error) { + // request and response + req := NewRequestOperation(OpQueryScan) + res := NewResponseOperation(req.UID) + + r := QueryScanResult{QueryScanPage: QueryScanPage{Rows: map[interface{}]interface{}{}}} + var err error + + // set parameters + if err = WriteInt(req, HashCode(cache)); err != nil { + return r, errors.Wrapf(err, "failed to write cache name") + } + if err = WriteBool(req, binary); err != nil { + return r, errors.Wrapf(err, "failed to write binary flag") + } + // filtering is not supported + if err = WriteNull(req); err != nil { + return r, errors.Wrapf(err, "failed to write null as filter object") + } + + if err = WriteInt(req, int32(data.PageSize)); err != nil { + return r, errors.Wrapf(err, "failed to write page size") + } + if err = WriteInt(req, int32(data.Partitions)); err != nil { + return r, errors.Wrapf(err, "failed to write number of partitions to query") + } + if err = WriteBool(req, data.LocalQuery); err != nil { + return r, errors.Wrapf(err, "failed to write local query flag") + } + + // execute operation + if err = c.Do(req, res); err != nil { + return r, errors.Wrapf(err, "failed to execute OP_QUERY_SCAN operation") + } + if err = res.CheckStatus(); err != nil { + return r, err + } + + // process result + if r.ID, err = ReadLong(res); err != nil { + return r, errors.Wrapf(err, "failed to read cursor ID") + } + count, err := ReadInt(res) + if err != nil { + return r, errors.Wrapf(err, "failed to read row count") + } + // read data + for i := 0; i < int(count); i++ { + key, err := ReadObject(res) + if err != nil { + return r, errors.Wrapf(err, "failed to read key with index %d", i) + } + value, err := ReadObject(res) + if err != nil { + return r, errors.Wrapf(err, "failed to read value with index %d", i) + } + r.Rows[key] = value + } + if r.HasMore, err = ReadBool(res); err != nil { + return r, errors.Wrapf(err, "failed to read has more flag") + } + return r, nil +} + +// QueryScanCursorGetPage fetches the next SQL query cursor page by cursor id that is obtained from OP_QUERY_SCAN. +func (c *client) QueryScanCursorGetPage(id int64) (QueryScanPage, error) { + // request and response + req := NewRequestOperation(OpQueryScanCursorGetPage) + res := NewResponseOperation(req.UID) + + r := QueryScanPage{Rows: map[interface{}]interface{}{}} + var err error + + // set parameters + if err = WriteLong(req, id); err != nil { + return r, errors.Wrapf(err, "failed to write cursor id") + } + + // execute operation + if err = c.Do(req, res); err != nil { + return r, errors.Wrapf(err, "failed to execute OP_QUERY_SCAN_CURSOR_GET_PAGE operation") + } + if err = res.CheckStatus(); err != nil { + return r, err + } + + // process result + count, err := ReadInt(res) + if err != nil { + return r, errors.Wrapf(err, "failed to read row count") + } + // read data + for i := 0; i < int(count); i++ { + key, err := ReadObject(res) + if err != nil { + return r, errors.Wrapf(err, "failed to read key with index %d", i) + } + value, err := ReadObject(res) + if err != nil { + return r, errors.Wrapf(err, "failed to read value with index %d", i) + } + r.Rows[key] = value + } + if r.HasMore, err = ReadBool(res); err != nil { + return r, errors.Wrapf(err, "failed to read has more flag") + } + + return r, nil +} + // ResourceClose closes a resource, such as query cursor. func (c *client) ResourceClose(id int64) error { // request and response diff --git a/binary/v1/client-sql-and-scan-queries_test.go b/binary/v1/client-sql-and-scan-queries_test.go index 2af917e..5466912 100644 --- a/binary/v1/client-sql-and-scan-queries_test.go +++ b/binary/v1/client-sql-and-scan-queries_test.go @@ -135,7 +135,7 @@ func Test_client_QuerySQLCursorGetPage(t *testing.T) { } row := got.Rows[int64(3)].(ComplexObject) if !reflect.DeepEqual(row.Fields[1], "Org 3") || !reflect.DeepEqual(row.Fields[2], tm) { - t.Errorf("client.QuerySQL() = %#v", got) + t.Errorf("client.QuerySQLCursorGetPage() = %#v", got) } }) } @@ -336,6 +336,139 @@ func Test_client_QuerySQLFieldsCursorGetPage(t *testing.T) { } } +func Test_client_QueryScan(t *testing.T) { + c, err := Connect(testConnInfo) + if err != nil { + t.Fatal(err) + } + defer c.Close() + // insert test values + tm := time.Date(2018, 4, 3, 14, 25, 32, int(time.Millisecond*123+time.Microsecond*456+789), time.UTC) + _, err = c.QuerySQLFields("QueryScan", false, QuerySQLFieldsData{ + PageSize: 10, + Query: "INSERT INTO Organization(_key, name, foundDateTime) VALUES" + + "(?, ?, ?)," + + "(?, ?, ?)," + + "(?, ?, ?)", + QueryArgs: []interface{}{ + int64(1), "Org 1", tm, + int64(2), "Org 2", tm, + int64(3), "Org 3", tm}, + }) + if err != nil { + t.Fatal(err) + } + + type args struct { + cache string + binary bool + data QueryScanData + } + tests := []struct { + name string + c Client + args args + want QueryScanResult + wantErr bool + }{ + { + name: "1", + c: c, + args: args{ + cache: "QueryScan", + data: QueryScanData{ + PageSize: 10, + Partitions: -1, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.c.QueryScan(tt.args.cache, tt.args.binary, tt.args.data) + if (err != nil) != tt.wantErr { + t.Errorf("client.QueryScan() error = %v, wantErr %v", err, tt.wantErr) + return + } + + row := got.Rows[int64(1)].(ComplexObject) + if !reflect.DeepEqual(row.Fields[1], "Org 1") || !reflect.DeepEqual(row.Fields[2], tm) { + t.Errorf("client.QueryScan() = %#v", got) + } + row = got.Rows[int64(2)].(ComplexObject) + if !reflect.DeepEqual(row.Fields[1], "Org 2") || !reflect.DeepEqual(row.Fields[2], tm) { + t.Errorf("client.QueryScan() = %#v", got) + } + row = got.Rows[int64(3)].(ComplexObject) + if !reflect.DeepEqual(row.Fields[1], "Org 3") || !reflect.DeepEqual(row.Fields[2], tm) { + t.Errorf("client.QueryScan() = %#v", got) + } + }) + } +} + +func Test_client_QueryScanCursorGetPage(t *testing.T) { + c, err := Connect(testConnInfo) + if err != nil { + t.Fatal(err) + } + defer c.Close() + // insert test values + tm := time.Date(2018, 4, 3, 14, 25, 32, int(time.Millisecond*123+time.Microsecond*456+789), time.UTC) + _, err = c.QuerySQLFields("QueryScanCursorGetPage", false, QuerySQLFieldsData{ + PageSize: 10, + Query: "INSERT INTO Organization(_key, name, foundDateTime) VALUES" + + "(?, ?, ?)," + + "(?, ?, ?)," + + "(?, ?, ?)", + QueryArgs: []interface{}{ + int64(1), "Org 1", tm, + int64(2), "Org 2", tm, + int64(3), "Org 3", tm}, + }) + if err != nil { + t.Fatal(err) + } + r, err := c.QueryScan("QueryScanCursorGetPage", false, QueryScanData{ + PageSize: 2, + Partitions: -1, + }) + if err != nil { + t.Fatal(err) + } + type args struct { + id int64 + } + tests := []struct { + name string + c Client + args args + want QueryScanPage + wantErr bool + }{ + { + name: "1", + c: c, + args: args{ + id: r.ID, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.c.QueryScanCursorGetPage(tt.args.id) + if (err != nil) != tt.wantErr { + t.Errorf("client.QueryScanCursorGetPage() error = %v, wantErr %v", err, tt.wantErr) + return + } + row := got.Rows[int64(3)].(ComplexObject) + if !reflect.DeepEqual(row.Fields[1], "Org 3") || !reflect.DeepEqual(row.Fields[2], tm) { + t.Errorf("client.QueryScanCursorGetPage() = %#v", got) + } + }) + } +} + func Test_client_ResourceClose(t *testing.T) { c, err := Connect(testConnInfo) if err != nil { diff --git a/binary/v1/client.go b/binary/v1/client.go index d014d2e..fbfb1f2 100644 --- a/binary/v1/client.go +++ b/binary/v1/client.go @@ -192,6 +192,14 @@ type Client interface { // https://apacheignite.readme.io/docs/binary-client-protocol-sql-operations#section-op_query_sql_fields_cursor_get_page QuerySQLFieldsCursorGetPage(id int64, fieldCount int) (QuerySQLFieldsPage, error) + // QueryScan performs scan query. + // https://apacheignite.readme.io/docs/binary-client-protocol-sql-operations#section-op_query_scan + QueryScan(cache string, binary bool, data QueryScanData) (QueryScanResult, error) + + // QueryScanCursorGetPage fetches the next SQL query cursor page by cursor id that is obtained from OP_QUERY_SCAN. + // https://apacheignite.readme.io/docs/binary-client-protocol-sql-operations#section-op_query_scan_cursor_get_page + QueryScanCursorGetPage(id int64) (QueryScanPage, error) + // ResourceClose closes a resource, such as query cursor. // https://apacheignite.readme.io/docs/binary-client-protocol-sql-operations#section-op_resource_close ResourceClose(id int64) error diff --git a/testdata/configuration-for-tests.xml b/testdata/configuration-for-tests.xml index cd25180..aedea89 100644 --- a/testdata/configuration-for-tests.xml +++ b/testdata/configuration-for-tests.xml @@ -317,6 +317,58 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +