Skip to content

Commit

Permalink
Add OP_QUERY_SCAN and OP_QUERY_SCAN_CURSOR_GET_PAGE support #7
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksandr Sokolovskii committed May 13, 2018
1 parent ea8f2b0 commit 0b5ac8b
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 6 deletions.
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ Project status:

1. Develop "[Cache Configuration](https://apacheignite.readme.io/docs/binary-client-protocol-cache-configuration-operations)" methods (Completed)
1. Develop "[Key-Value Queries](https://apacheignite.readme.io/docs/binary-client-protocol-key-value-operations)" methods (Completed*)
1. Develop "[SQL and Scan Queries](https://apacheignite.readme.io/docs/binary-client-protocol-sql-operations)" methods (Completed**)
1. Develop "[SQL and Scan Queries](https://apacheignite.readme.io/docs/binary-client-protocol-sql-operations)" methods (Completed)
1. Develop SQL driver (Completed)
1. Develop "[Binary Types](https://apacheignite.readme.io/docs/binary-client-protocol-binary-type-operations)" methods (Not started)

*Not all types are supported. See **[type mapping](#type-mapping)** for detail.

**Not all operations are supported. See **[table](#sql-and-scan-queries-supported-operations)** for detail.

### How to install

```shell
Expand Down Expand Up @@ -260,8 +258,8 @@ log.Printf("key=\"%s\", value=%t", "field3", v)
| OP_QUERY_SQL_CURSOR_GET_PAGE | Done. |
| OP_QUERY_SQL_FIELDS | Done. |
| OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE | Done. |
| OP_QUERY_SCAN | Not started. Need help. |
| OP_QUERY_SCAN_CURSOR_GET_PAGE | Not started. Need help. |
| OP_QUERY_SCAN | Done (without filter object support). |
| OP_QUERY_SCAN_CURSOR_GET_PAGE | Done (without filter object support). |
| OP_RESOURCE_CLOSE | Done. |

### Error handling
Expand Down
141 changes: 141 additions & 0 deletions binary/v1/client-sql-and-scan-queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,37 @@ type QuerySQLFieldsResult struct {
QuerySQLFieldsPage
}

// QueryScanData input parameter for QueryScan func
type QueryScanData struct {
// Cursor page size.
PageSize int

// Number of partitions to query (negative to query entire cache).
Partitions int

// Local flag - whether this query should be executed on local node only.
LocalQuery bool
}

// QueryScanPage is query result page
type QueryScanPage struct {
// Key -> Values
Rows map[interface{}]interface{}

// Indicates whether more results are available to be fetched with QueryScanCursorGetPage.
// When true, query cursor is closed automatically.
HasMore bool
}

// QueryScanResult output from QueryScan func
type QueryScanResult struct {
// Cursor id. Can be closed with ResourceClose func.
ID int64

// Query result first page
QueryScanPage
}

func (c *client) QuerySQL(cache string, binary bool, data QuerySQLData) (QuerySQLResult, error) {
// request and response
req := NewRequestOperation(OpQuerySQL)
Expand Down Expand Up @@ -462,6 +493,116 @@ func (c *client) QuerySQLFieldsCursorGetPage(id int64, fieldCount int) (QuerySQL
return r, nil
}

func (c *client) QueryScan(cache string, binary bool, data QueryScanData) (QueryScanResult, error) {
// request and response
req := NewRequestOperation(OpQueryScan)
res := NewResponseOperation(req.UID)

r := QueryScanResult{QueryScanPage: QueryScanPage{Rows: map[interface{}]interface{}{}}}
var err error

// set parameters
if err = WriteInt(req, HashCode(cache)); err != nil {
return r, errors.Wrapf(err, "failed to write cache name")
}
if err = WriteBool(req, binary); err != nil {
return r, errors.Wrapf(err, "failed to write binary flag")
}
// filtering is not supported
if err = WriteNull(req); err != nil {
return r, errors.Wrapf(err, "failed to write null as filter object")
}

if err = WriteInt(req, int32(data.PageSize)); err != nil {
return r, errors.Wrapf(err, "failed to write page size")
}
if err = WriteInt(req, int32(data.Partitions)); err != nil {
return r, errors.Wrapf(err, "failed to write number of partitions to query")
}
if err = WriteBool(req, data.LocalQuery); err != nil {
return r, errors.Wrapf(err, "failed to write local query flag")
}

// execute operation
if err = c.Do(req, res); err != nil {
return r, errors.Wrapf(err, "failed to execute OP_QUERY_SCAN operation")
}
if err = res.CheckStatus(); err != nil {
return r, err
}

// process result
if r.ID, err = ReadLong(res); err != nil {
return r, errors.Wrapf(err, "failed to read cursor ID")
}
count, err := ReadInt(res)
if err != nil {
return r, errors.Wrapf(err, "failed to read row count")
}
// read data
for i := 0; i < int(count); i++ {
key, err := ReadObject(res)
if err != nil {
return r, errors.Wrapf(err, "failed to read key with index %d", i)
}
value, err := ReadObject(res)
if err != nil {
return r, errors.Wrapf(err, "failed to read value with index %d", i)
}
r.Rows[key] = value
}
if r.HasMore, err = ReadBool(res); err != nil {
return r, errors.Wrapf(err, "failed to read has more flag")
}
return r, nil
}

// QueryScanCursorGetPage fetches the next SQL query cursor page by cursor id that is obtained from OP_QUERY_SCAN.
func (c *client) QueryScanCursorGetPage(id int64) (QueryScanPage, error) {
// request and response
req := NewRequestOperation(OpQueryScanCursorGetPage)
res := NewResponseOperation(req.UID)

r := QueryScanPage{Rows: map[interface{}]interface{}{}}
var err error

// set parameters
if err = WriteLong(req, id); err != nil {
return r, errors.Wrapf(err, "failed to write cursor id")
}

// execute operation
if err = c.Do(req, res); err != nil {
return r, errors.Wrapf(err, "failed to execute OP_QUERY_SCAN_CURSOR_GET_PAGE operation")
}
if err = res.CheckStatus(); err != nil {
return r, err
}

// process result
count, err := ReadInt(res)
if err != nil {
return r, errors.Wrapf(err, "failed to read row count")
}
// read data
for i := 0; i < int(count); i++ {
key, err := ReadObject(res)
if err != nil {
return r, errors.Wrapf(err, "failed to read key with index %d", i)
}
value, err := ReadObject(res)
if err != nil {
return r, errors.Wrapf(err, "failed to read value with index %d", i)
}
r.Rows[key] = value
}
if r.HasMore, err = ReadBool(res); err != nil {
return r, errors.Wrapf(err, "failed to read has more flag")
}

return r, nil
}

// ResourceClose closes a resource, such as query cursor.
func (c *client) ResourceClose(id int64) error {
// request and response
Expand Down
135 changes: 134 additions & 1 deletion binary/v1/client-sql-and-scan-queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func Test_client_QuerySQLCursorGetPage(t *testing.T) {
}
row := got.Rows[int64(3)].(ComplexObject)
if !reflect.DeepEqual(row.Fields[1], "Org 3") || !reflect.DeepEqual(row.Fields[2], tm) {
t.Errorf("client.QuerySQL() = %#v", got)
t.Errorf("client.QuerySQLCursorGetPage() = %#v", got)
}
})
}
Expand Down Expand Up @@ -336,6 +336,139 @@ func Test_client_QuerySQLFieldsCursorGetPage(t *testing.T) {
}
}

func Test_client_QueryScan(t *testing.T) {
c, err := Connect(testConnInfo)
if err != nil {
t.Fatal(err)
}
defer c.Close()
// insert test values
tm := time.Date(2018, 4, 3, 14, 25, 32, int(time.Millisecond*123+time.Microsecond*456+789), time.UTC)
_, err = c.QuerySQLFields("QueryScan", false, QuerySQLFieldsData{
PageSize: 10,
Query: "INSERT INTO Organization(_key, name, foundDateTime) VALUES" +
"(?, ?, ?)," +
"(?, ?, ?)," +
"(?, ?, ?)",
QueryArgs: []interface{}{
int64(1), "Org 1", tm,
int64(2), "Org 2", tm,
int64(3), "Org 3", tm},
})
if err != nil {
t.Fatal(err)
}

type args struct {
cache string
binary bool
data QueryScanData
}
tests := []struct {
name string
c Client
args args
want QueryScanResult
wantErr bool
}{
{
name: "1",
c: c,
args: args{
cache: "QueryScan",
data: QueryScanData{
PageSize: 10,
Partitions: -1,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.c.QueryScan(tt.args.cache, tt.args.binary, tt.args.data)
if (err != nil) != tt.wantErr {
t.Errorf("client.QueryScan() error = %v, wantErr %v", err, tt.wantErr)
return
}

row := got.Rows[int64(1)].(ComplexObject)
if !reflect.DeepEqual(row.Fields[1], "Org 1") || !reflect.DeepEqual(row.Fields[2], tm) {
t.Errorf("client.QueryScan() = %#v", got)
}
row = got.Rows[int64(2)].(ComplexObject)
if !reflect.DeepEqual(row.Fields[1], "Org 2") || !reflect.DeepEqual(row.Fields[2], tm) {
t.Errorf("client.QueryScan() = %#v", got)
}
row = got.Rows[int64(3)].(ComplexObject)
if !reflect.DeepEqual(row.Fields[1], "Org 3") || !reflect.DeepEqual(row.Fields[2], tm) {
t.Errorf("client.QueryScan() = %#v", got)
}
})
}
}

func Test_client_QueryScanCursorGetPage(t *testing.T) {
c, err := Connect(testConnInfo)
if err != nil {
t.Fatal(err)
}
defer c.Close()
// insert test values
tm := time.Date(2018, 4, 3, 14, 25, 32, int(time.Millisecond*123+time.Microsecond*456+789), time.UTC)
_, err = c.QuerySQLFields("QueryScanCursorGetPage", false, QuerySQLFieldsData{
PageSize: 10,
Query: "INSERT INTO Organization(_key, name, foundDateTime) VALUES" +
"(?, ?, ?)," +
"(?, ?, ?)," +
"(?, ?, ?)",
QueryArgs: []interface{}{
int64(1), "Org 1", tm,
int64(2), "Org 2", tm,
int64(3), "Org 3", tm},
})
if err != nil {
t.Fatal(err)
}
r, err := c.QueryScan("QueryScanCursorGetPage", false, QueryScanData{
PageSize: 2,
Partitions: -1,
})
if err != nil {
t.Fatal(err)
}
type args struct {
id int64
}
tests := []struct {
name string
c Client
args args
want QueryScanPage
wantErr bool
}{
{
name: "1",
c: c,
args: args{
id: r.ID,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.c.QueryScanCursorGetPage(tt.args.id)
if (err != nil) != tt.wantErr {
t.Errorf("client.QueryScanCursorGetPage() error = %v, wantErr %v", err, tt.wantErr)
return
}
row := got.Rows[int64(3)].(ComplexObject)
if !reflect.DeepEqual(row.Fields[1], "Org 3") || !reflect.DeepEqual(row.Fields[2], tm) {
t.Errorf("client.QueryScanCursorGetPage() = %#v", got)
}
})
}
}

func Test_client_ResourceClose(t *testing.T) {
c, err := Connect(testConnInfo)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions binary/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ type Client interface {
// https://apacheignite.readme.io/docs/binary-client-protocol-sql-operations#section-op_query_sql_fields_cursor_get_page
QuerySQLFieldsCursorGetPage(id int64, fieldCount int) (QuerySQLFieldsPage, error)

// QueryScan performs scan query.
// https://apacheignite.readme.io/docs/binary-client-protocol-sql-operations#section-op_query_scan
QueryScan(cache string, binary bool, data QueryScanData) (QueryScanResult, error)

// QueryScanCursorGetPage fetches the next SQL query cursor page by cursor id that is obtained from OP_QUERY_SCAN.
// https://apacheignite.readme.io/docs/binary-client-protocol-sql-operations#section-op_query_scan_cursor_get_page
QueryScanCursorGetPage(id int64) (QueryScanPage, error)

// ResourceClose closes a resource, such as query cursor.
// https://apacheignite.readme.io/docs/binary-client-protocol-sql-operations#section-op_resource_close
ResourceClose(id int64) error
Expand Down
Loading

0 comments on commit 0b5ac8b

Please sign in to comment.