Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Pulsar #268

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,19 @@ metadata:
### Sink Configure
Supported Sinks:

| Sink Name | Description |
| ---------------------------- | :-------------------------------- |
| <a href="docs/en/dingtalk-sink.md">dingtalk</a> | sink to dingtalk bot |
| <a href="docs/en/sls-sink.md">sls</a> | sink to alibaba cloud sls service |
| Sink Name | Description |
|-----------------------------------------------------------|:----------------------------------|
| <a href="docs/en/dingtalk-sink.md">dingtalk</a> | sink to dingtalk bot |
| <a href="docs/en/sls-sink.md">sls</a> | sink to alibaba cloud sls service |
| <a href="docs/en/elasticsearch-sink.md">elasticsearch</a> | sink to elasticsearch |
| <a href="docs/en/honeycomb-sink.md">honeycomb</a> | sink to honeycomb |
| <a href="docs/en/influxdb-sink.md">influxdb</a> | sink to influxdb |
| <a href="docs/en/kafka-sink.md">kafka</a> | sink to kafka |
| <a href="docs/en/mysql-sink.md">mysql</a> | sink to mysql database |
| <a href="docs/en/wechat-sink.md">wechat</a> | sink to wechat |
| <a href="docs/en/webhook-sink.md">webhook</a> | sink to webhook |
| <a href="docs/en/mongodb-sink.md">mongodb</a> | sink to mongodb |
| <a href="docs/en/honeycomb-sink.md">honeycomb</a> | sink to honeycomb |
| <a href="docs/en/influxdb-sink.md">influxdb</a> | sink to influxdb |
| <a href="docs/en/kafka-sink.md">kafka</a> | sink to kafka |
| <a href="docs/en/mysql-sink.md">mysql</a> | sink to mysql database |
| <a href="docs/en/wechat-sink.md">wechat</a> | sink to wechat |
| <a href="docs/en/webhook-sink.md">webhook</a> | sink to webhook |
| <a href="docs/en/mongodb-sink.md">mongodb</a> | sink to mongodb |
| <a href="docs/en/pulsar-sink.md">pulsar</a> | sink to pulsar |

### Contributing
Please check <a href="docs/en/CONTRIBUTING.md" target="_blank">CONTRIBUTING.md</a>
Expand Down
116 changes: 116 additions & 0 deletions common/pulsar/pulsar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pulsar

import (
"context"
"encoding/json"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"k8s.io/klog"
"net/url"
"time"
)

type PulsarClient interface {
Name() string
Stop()
ProducePulsarMessage(msgData interface{}) error
}

type pulsarSink struct {
topic string
producer pulsar.Producer
}

func (p *pulsarSink) Name() string {
return "Apache Pulsar Sink"
}

func (p *pulsarSink) Stop() {
p.producer.Close()

}

func (p *pulsarSink) ProducePulsarMessage(msgData interface{}) error {
start := time.Now()
msgJson, err := json.Marshal(msgData)
if err != nil {
return fmt.Errorf("failed to transform the items to json : %s", err)
}
send, err := p.producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: msgJson,
Properties: nil,
})
if err != nil {
return fmt.Errorf("failed to produce message to Pulsar: %s", err)
}
end := time.Now()
klog.V(4).Infof("Exported %d data to Pulsar in %s, messageID: %s", len(msgJson), end.Sub(start), send.String())
return nil
}

func NewPulsarClient(uri *url.URL) (PulsarClient, error) {
opts, err := url.ParseQuery(uri.RawQuery)
if err != nil {
return nil, fmt.Errorf("failed to parse url's query string: %s", err)
}
klog.V(3).Infof("Pulsar opts: %v", opts)

var (
serviceURL []string
token, topic string
client pulsar.Client
)
if len(opts["serviceurl"]) < 1 {
return nil, fmt.Errorf("there is no broker assigned for connecting Pulsar")
}
serviceURL = append(serviceURL, opts["serviceurl"]...)

if len(opts["eventstopic"]) != 1 {
return nil, fmt.Errorf("there is no topic assigned for connecting Pulsar")
}
topic = opts["eventstopic"][0]

if len(opts["token"]) > 0 {
token = opts["token"][0]
}

if len(token) == 0 {
client, err = pulsar.NewClient(pulsar.ClientOptions{
URL: serviceURL[0],
})
if err != nil {
return nil, fmt.Errorf("failed to create Pulsar client: %v", err)
}
} else {
client, err = pulsar.NewClient(pulsar.ClientOptions{
URL: serviceURL[0],
Authentication: pulsar.NewAuthenticationToken(token),
})
if err != nil {
return nil, fmt.Errorf("failed to create Pulsar client: %v", err)
}
}

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Name: "kube-eventer",
Topic: topic,
})

return &pulsarSink{
producer: producer,
}, nil
}
15 changes: 15 additions & 0 deletions docs/en/pulsar-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
### Pulsar sink

To use the Pulsar sink add the following flag:

--sink=Pulsar:<?<OPTIONS>>

* `serviceurl` - Pulsar's broker or proxy.
* `eventstopic` - Pulsar's topic for events.
* `token` - Pulsar's JWT token, If you enable [JWT](https://pulsar.apache.org/docs/next/security-jwt/).

For example,

--sink=pulsar:?serviceurl=pulsar://127.0.0.1:6650&eventstopic=persistent://public/default/event&token=eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9
or
--sink=pulsar:?serviceurl=pulsar://127.0.0.1:6650&eventstopic=persistent://public/default/event
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ require (
github.com/Shopify/sarama v1.22.1
github.com/alibabacloud-go/eventbridge-sdk v1.2.6
github.com/alibabacloud-go/tea-utils v1.3.7
github.com/apache/pulsar-client-go v0.10.0
github.com/aws/aws-sdk-go v1.34.28
github.com/denverdino/aliyungo v0.0.0-20190410085603-611ead8a6fed
github.com/go-sql-driver/mysql v1.5.0
github.com/golang/protobuf v1.4.3
github.com/golang/protobuf v1.5.2
github.com/google/cadvisor v0.33.1
github.com/google/uuid v1.1.1
github.com/google/uuid v1.1.2
github.com/imdario/mergo v0.3.7 // indirect
github.com/influxdata/influxdb v1.7.7
github.com/mailru/easyjson v0.7.0 // indirect
Expand All @@ -22,7 +23,7 @@ require (
github.com/riemann/riemann-go-client v0.4.0
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9
github.com/smartystreets/gunit v1.0.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.8.0
go.mongodb.org/mongo-driver v1.5.1
golang.org/x/sys v0.2.0 // indirect
gopkg.in/olivere/elastic.v3 v3.0.75
Expand Down
Loading