Skip to content

Commit

Permalink
Add NewPool, fix some typos and add clarification on Bindings not bei…
Browse files Browse the repository at this point in the history
…ng supported by AWS Neptune
  • Loading branch information
schwartzmx committed Oct 11, 2019
1 parent a6aabc9 commit 49bad8d
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 19 deletions.
85 changes: 70 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,26 @@ gremtune is a fast, efficient, and easy-to-use client for the TinkerPop graph da

**Modifications were made to `gremgo` in order to "support" AWS Neptune's lack of Gremlin-specific features, like no support query bindings among others. See differences in Gremlin support here: [AWS Neptune Gremlin Implementation Differences](https://docs.aws.amazon.com/neptune/latest/userguide/access-graph-gremlin-differences.html)**

Installation
==========
**NOTE**: In order allow continued use for other Graph databases supporting Gremlin outside of only AWS Neptune, this client still supports querying with Bindings via `ExecuteWithBindings` or `ExecuteFileWithBindings` however this **WILL NOT** work with AWS Neptune as Neptune does not support variables or bindings,

>Variables
>Neptune does not support Gremlin variables and does not support the bindings property.
src: https://docs.aws.amazon.com/neptune/latest/userguide/access-graph-gremlin-differences.html

## Installation
```
go get github.com/schwartzmx/gremtune
dep ensure
```

Documentation
==========
## Documentation
[GoDoc](https://godoc.org/github.com/schwartzmx/gremtune)

* [GoDoc](https://godoc.org/github.com/schwartzmx/gremtune)
## Examples

Example
==========
#### Single Client
```go
package main

Expand Down Expand Up @@ -60,11 +66,62 @@ func main() {
fmt.Printf("%s", j)
}
```
#### Pool Client
```go
package main

import (
"fmt"
"log"

"github.com/schwartzmx/gremtune"
)

func main() {
var gp *Pool
var gperrs = make(chan error)

go func(chan error) {
err := <-gperrs
log.Fatal("Lost connection to the database: " + err.Error())
}(gperrs)

dialFn := func() (*Client, error) {
dialer := gremtune.NewDialer("ws://127.0.0.1:8182")
c, err := gremtune.Dial(dialer, gperrs)
if err != nil {
log.Fatal(err)
}
return &c, err
}
/* Pool object can be initialized directly
pool := gremtune.Pool{
Dial: dialFn,
MaxActive: 10,
IdleTimeout: time.Duration(10 * time.Second),
}
or via NewPool() with a PoolConfig */
pool := gremtune.NewPool(gremtune.PoolConfig{
Dial: dialFn,
MaxActive: 10,
IdleTimeout: time.Duration(10 * time.Second),
})

res, err := pool.Execute( // Sends a query to Gremlin Server via the Pool
"g.V('1234')"
)
if err != nil {
fmt.Println(err)
return
}
```
Example for streaming the result
==========
Neptune provides 64 values per Response that is why Execute at present provides a [] of Response since it waits for all the responses to be retrieved and then provides it.In ExecuteAsync method it takes a channel to provide the Response as request parameter and provides the Response as and when it is provided by Neptune. The Response are streamed to the caller and once all the Responses are provided the channel is closed.
go test -v -run ExecuteBulkDataAsync is the cmd to run the testcase)
#### Example Streaming Query Results
Neptune provides 64 values per Response that is why Execute at present provides a slice of `[]Response` s since it waits for all the responses to be retrieved and then provides it when all have returned. The `ExecuteAsync` method takes a channel to provide the `Response` as a request parameter and returns the `Response` as and when it is provided by Neptune. The `Response` are streamed to the caller and once all the `Response`s are provided the channel is closed.
```go
package main

Expand Down Expand Up @@ -120,8 +177,7 @@ func main() {
}
```
Authentication
==========
## Authentication
The plugin accepts authentication creating a secure dialer where credentials are setted.
If the server where are you trying to connect needs authentication and you do not provide the
credentials the complement will panic.
Expand Down Expand Up @@ -165,6 +221,5 @@ func main() {
}
```
License
==========
## License
See [LICENSE](LICENSE.md)
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (c *Client) authenticate(requestID string) (err error) {
return
}

// ExecuteWithBindings formats a raw Gremlin query, sends it to Gremlin Server, and returns the result.
// ExecuteWithBindings formats a raw Gremlin query, sends it to Gremlin Server, and returns the result. NOTE: AWS Neptune does not currently support bindings or rebindings see: https://docs.aws.amazon.com/neptune/latest/userguide/access-graph-gremlin-differences.html
func (c *Client) ExecuteWithBindings(query string, bindings, rebindings map[string]string) (resp []Response, err error) {
if c.conn.IsDisposed() {
return resp, errors.New("you cannot write on disposed connection")
Expand All @@ -155,7 +155,7 @@ func (c *Client) Execute(query string) (resp []Response, err error) {
return
}

// Execute formats a raw Gremlin query, sends it to Gremlin Server, and the results are streamed to channel provided in method paramater.
// ExecuteAsync formats a raw Gremlin query, sends it to Gremlin Server, and the results are streamed to channel provided in method parameter.
func (c *Client) ExecuteAsync(query string, responseChannel chan AsyncResponse) (err error) {
if c.conn.IsDisposed() {
return errors.New("you cannot write on disposed connection")
Expand All @@ -164,7 +164,7 @@ func (c *Client) ExecuteAsync(query string, responseChannel chan AsyncResponse)
return
}

// ExecuteFileWithBindings takes a file path to a Gremlin script, sends it to Gremlin Server with bindings, and returns the result.
// ExecuteFileWithBindings takes a file path to a Gremlin script, sends it to Gremlin Server with bindings, and returns the result. NOTE: AWS Neptune does not currently support bindings or rebindings see: https://docs.aws.amazon.com/neptune/latest/userguide/access-graph-gremlin-differences.html
func (c *Client) ExecuteFileWithBindings(path string, bindings, rebindings map[string]string) (resp []Response, err error) {
if c.conn.IsDisposed() {
return resp, errors.New("you cannot write on disposed connection")
Expand Down
25 changes: 24 additions & 1 deletion gremtune_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func InitGremlinClients() {
log.Fatal("Lost connection to the database: " + err.Error())
}(gperrs)
initClient()
initPool()
initNewPool()
}

func initClient() {
Expand Down Expand Up @@ -57,3 +57,26 @@ func initPool() {
}
gp = &pool
}

func initNewPool() {
if gp != nil {
return
}
dialFn := func() (*Client, error) {
dialer := NewDialer("ws://127.0.0.1:8182")
c, err := Dial(dialer, gperrs)
if err != nil {
log.Fatal(err)
}
return &c, err
}
pool, err := NewPool(PoolConfig{
Dial: dialFn,
MaxActive: 10,
IdleTimeout: time.Duration(10 * time.Second),
})
if err != nil {
log.Fatalf("Error intializing Pool, %s", err)
}
gp = pool
}
22 changes: 22 additions & 0 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gremtune

import (
"fmt"
"log"
"sync"
"time"
)
Expand All @@ -18,6 +19,13 @@ type Pool struct {
closed bool
}

// PoolConfig represents a new Pool configuration, used in NewPool for initializing a connection pool
type PoolConfig struct {
Dial func() (*Client, error)
MaxActive int
IdleTimeout time.Duration
}

// PooledConnection represents a shared and reusable connection.
type PooledConnection struct {
Pool *Pool
Expand All @@ -30,6 +38,20 @@ type idleConnection struct {
t time.Time
}

// NewPool intializes a new connection Pool
func NewPool(pc PoolConfig) (*Pool, error) {
p := new(Pool)
p.Dial = pc.Dial
p.MaxActive = pc.MaxActive
p.IdleTimeout = pc.IdleTimeout
_, err := p.Get()
if err != nil {
log.Printf("Error intializing Pool: %s", err)
return nil, err
}
return p, nil
}

// Get will return an available pooled connection. Either an idle connection or
// by dialing a new one if the pool does not currently have a maximum number
// of active connections.
Expand Down

0 comments on commit 49bad8d

Please sign in to comment.