Skip to content

Commit

Permalink
Update docs and examples to use new kafkaproducer API (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnadler authored Feb 12, 2020
1 parent 5c930cb commit 61b4be5
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 16 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ Some of the concerns Firebolt addresses include:

## Developing

Firebolt depends on [librdkafka](https://github.com/edenhill/librdkafka) v1.1.0 or later. To get started building a
Firebolt depends on [librdkafka](https://github.com/edenhill/librdkafka) v1.3.0 or later. To get started building a
firebolt app (or working on firebolt itself), install it following the
[instructions here](https://github.com/edenhill/librdkafka#installation).

An example for debian-based distros:
```
```
sudo wget -qO - https://packages.confluent.io/deb/5.4/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.4 stable main"
sudo apt-get update
Expand Down
6 changes: 4 additions & 2 deletions docs/node-kafkaproducer.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

||||
|--------------|:--------:|--------|
| **Accepts:** | `[]byte` | Parent node should serialize to the byte representation that will be put on the kafka topic |
| **Accepts:** | `*firebolt.ProduceRequest` | The ProduceRequest interface requires a `Message()` method that returns the raw byte representation that will be put on the kafka topic. The `Topic()` method can optionally return a topic name that overrides the static topic provided in the configuration (see below). |
| **Returns:** | n/a | All events are filtered; this node acts as a sink |


The `kafkaproducer` is a built-in node type for producing events onto a kafka topic. Any encoding can be used, but you
must perform the encoding and convert to `[]byte` in the parent node.
must perform the encoding and convert to `[]byte` in the parent node, then use that byte array to build a `*firebolt.ProduceRequest`.

A trivial implementation of the `ProduceRequest` interface is provided, `firebolt.SimpleProduceRequest`, that can be used in many cases.

Internally, `kafkaproducer` uses an async producer based on the `confluent-kafka-go` client.

Expand Down
7 changes: 5 additions & 2 deletions examples/kafkatokafka/jsonbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ func (j *JSONBuilder) Process(event *firebolt.Event) (*firebolt.Event, error) {
if err != nil {
return nil, err
}
//println("built JSON: " + string(jsonBytes))

return event.WithPayload(jsonBytes), nil
produceRequest := &firebolt.SimpleProduceRequest{
MessageBytes: jsonBytes,
}

return event.WithPayload(produceRequest), nil
}

// Shutdown is a no-op in jsonbuilder. This is where you'd clean up any resources on application shutdown.
Expand Down
12 changes: 5 additions & 7 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ import (
"os"
"reflect"

"github.com/digitalocean/firebolt/testutil"

"github.com/digitalocean/firebolt/examples/shared"

"github.com/digitalocean/firebolt/node/elasticsearch"

"github.com/digitalocean/captainslog"
"github.com/digitalocean/firebolt"
"github.com/digitalocean/firebolt/examples/kafkatokafka"
"github.com/digitalocean/firebolt/examples/logging"
"github.com/digitalocean/firebolt/examples/shared"
"github.com/digitalocean/firebolt/executor"
"github.com/digitalocean/firebolt/metrics"
"github.com/digitalocean/firebolt/node"
"github.com/digitalocean/firebolt/node/elasticsearch"
"github.com/digitalocean/firebolt/testutil"
)

// choose an example to run; see 'runKafkaToKafka()' or 'runLogging()' for the actual example code
Expand Down Expand Up @@ -60,7 +58,7 @@ func runKafkaToKafka() {
// register the jsonbuilder node
node.GetRegistry().RegisterNodeType("jsonbuilder", func() node.Node {
return &kafkatokafka.JSONBuilder{}
}, reflect.TypeOf(captainslog.SyslogMsg{}), reflect.TypeOf([]byte(nil)))
}, reflect.TypeOf(captainslog.SyslogMsg{}), reflect.TypeOf((*firebolt.ProduceRequest)(nil)).Elem())

// read the config file and start the firebolt executor
ex, err := executor.New(executor.WithConfigFile("kafkatokafka/firebolt.yaml"))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/tidwall/gjson v1.2.1 // indirect
github.com/tidwall/match v1.0.1 // indirect
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 // indirect
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.2.8
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 h1:sfkvUWPNGwSV+8/fNqctR5lS2AqCSqYwXdrjCxp/dXo=
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
Expand Down

0 comments on commit 61b4be5

Please sign in to comment.