Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Rabbitmq support #284

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ Supported Sinks:
| <a href="docs/en/honeycomb-sink.md">honeycomb</a> | sink to honeycomb |
| <a href="docs/en/influxdb-sink.md">influxdb</a> | sink to influxdb |
| <a href="docs/en/kafka-sink.md">kafka</a> | sink to kafka |
| <a href="docs/en/rabbitmq-sink.md">rabbitMQ</a> | sink to rabbitMQ |
| <a href="docs/en/mysql-sink.md">mysql</a> | sink to mysql database |
| <a href="docs/en/wechat-sink.md">wechat</a> | sink to wechat |
| <a href="docs/en/webhook-sink.md">webhook</a> | sink to webhook |
Expand Down
6 changes: 6 additions & 0 deletions common/rabbitmq/OWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
reviewers:
- amirmhp
- sina_mehrabi
approvers:
- amirmhp
- sina_mehrabi
27 changes: 27 additions & 0 deletions common/rabbitmq/glogadapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package rabbitmq

import (
"k8s.io/klog"
_ "k8s.io/klog"
)

type Logging interface {
Print(v ...interface{})
Printf(format string, v ...interface{})
Println( v ...interface{})
}

type GologAdapterLogger struct {
}

func (l GologAdapterLogger) Print(v ...interface{}) {
klog.Info(v...)
}

func (l GologAdapterLogger) Printf(format string, v ...interface{}) {
klog.Infof(format, v...)
}

func (l GologAdapterLogger) Println(v ...interface{}) {
klog.Infoln(v...)
}
239 changes: 239 additions & 0 deletions common/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rabbitmq

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"io/ioutil"
"k8s.io/klog"
"net"
"net/url"
"strconv"
"time"
)

const (
brokerDialTimeout = 10 * time.Second
brokerDialRetryLimit = 1
brokerDialRetryWait = 0
metricsTopic = "heapster-metrics"
eventsTopic = "heapster-events"
)

const (
TimeSeriesTopic = "timeseriestopic"
EventsTopic = "eventstopic"
)

type AmqpClient interface {
Name() string
Stop()
ProduceAmqpMessage(msgData interface{}) error
}

type amqpSink struct {
producer *amqp.Channel
dataTopic string
}

func (sink *amqpSink) ProduceAmqpMessage(msgData interface{}) error {
start := time.Now()
msgJson, err := json.Marshal(msgData)
if err != nil {
return fmt.Errorf("failed to transform the items to json : %s", err)
}

ctx := context.Background()

err = sink.producer.PublishWithContext(
ctx,
sink.dataTopic,
"#",
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: msgJson,
},
)
if err != nil {
return err
}

end := time.Now()
klog.V(4).Infof("Exported %d data to rabbitmq in %s", len(msgJson), end.Sub(start))

return nil
}

func (sink *amqpSink) Name() string {
return "Apache Amqp Sink"
}

func (sink *amqpSink) Stop() {
err := sink.producer.Close()
if err != nil {
return
}
}

func getTopic(opts map[string][]string, topicType string) (string, error) {
var topic string
switch topicType {
case TimeSeriesTopic:
topic = metricsTopic
case EventsTopic:
topic = eventsTopic
default:
return "", fmt.Errorf("Topic type '%s' is illegal.", topicType)
}

if len(opts[topicType]) > 0 {
topic = opts[topicType][0]
}

return topic, nil
}

func getOptionsWithoutSecrets(values url.Values) string {
var password []string
if len(values["password"]) != 0 {
password = values["password"]
values["password"] = []string{"***"}
defer func() { values["password"] = password }()
}
options := fmt.Sprintf("amqp sink option: %v", values)
return options
}

func CustomDialer(brokerDialTimeout time.Duration, brokerDialRetryLimit int, brokerDialRetryWait time.Duration) func(network, addr string) (net.Conn, error) {
return func(network, addr string) (net.Conn, error) {
var conn net.Conn
var err error
for i := 0; i <= brokerDialRetryLimit; i++ {
conn, err = net.DialTimeout(network, addr, brokerDialTimeout)
if err == nil {
return conn, nil
}
if i < brokerDialRetryLimit {
time.Sleep(brokerDialRetryWait)
}
}
return nil, fmt.Errorf("failed to dial: %v", err)
}
}

func getTlsConfiguration(opts url.Values) (*tls.Config, bool, error) {
if len(opts["cacert"]) == 0 &&
(len(opts["cert"]) == 0 || len(opts["key"]) == 0) {
return nil, false, nil
}

t := &tls.Config{}
if len(opts["cacert"]) != 0 {
caFile := opts["cacert"][0]
caCert, err := ioutil.ReadFile(caFile)
if err != nil {
return nil, false, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
t.RootCAs = caCertPool
}

if len(opts["cert"]) != 0 && len(opts["key"]) != 0 {
certFile := opts["cert"][0]
keyFile := opts["key"][0]
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, false, err
}
t.Certificates = []tls.Certificate{cert}
}

if len(opts["insecuressl"]) != 0 {
insecuressl := opts["insecuressl"][0]
insecure, err := strconv.ParseBool(insecuressl)
if err != nil {
return nil, false, err
}
t.InsecureSkipVerify = insecure
}

return t, true, nil
}

func GetRabbitMQURL(values url.Values) string {
return fmt.Sprintf("amqp://%s:%s@%s:%s", values.Get("username"), values.Get("password"), values.Get("host"), values.Get("port"))
}

func NewAmqpClient(uri *url.URL, topicType string) (AmqpClient, error) {
opts, err := url.ParseQuery(uri.RawQuery)
if err != nil {
return nil, fmt.Errorf("failed to parse url's query string: %s", err)
}
klog.V(3).Info(getOptionsWithoutSecrets(opts))

topic, err := getTopic(opts, topicType)
if err != nil {
return nil, err
}

amqp.Logger = GologAdapterLogger{}

TLSClientConfig, TLSClientConfigEnable, err := getTlsConfiguration(opts)

var config = amqp.Config{}

if TLSClientConfigEnable {
config.TLSClientConfig = TLSClientConfig
}

config.Dial = CustomDialer(brokerDialTimeout, brokerDialRetryLimit, brokerDialRetryWait)

conn, err := amqp.DialConfig(GetRabbitMQURL(opts), config)
if err != nil {
return nil, err
}

klog.V(3).Infof("attempting to setup amqp sink")
sinkProducer, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("Failed to setup Producer: - %v", err)
}
err = sinkProducer.ExchangeDeclare(
topic,
amqp.ExchangeTopic,
true,
false,
false,
false,
nil,
)
if err != nil {
return nil, fmt.Errorf("failed to declare exchange: %v", err)
}
klog.V(3).Infof("amqp sink setup successfully")

return &amqpSink{
producer: sinkProducer,
dataTopic: topic,
}, nil
}
23 changes: 23 additions & 0 deletions docs/en/rabbitmq-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
### RabbitMQ sink

To use the rabbitMQ sink add the following flag:

--sink=rabbitmq:<?<OPTIONS>>

The RabbitMQ sink is used to configure the integration with RabbitMQ for event streaming.
This sink allows you to specify RabbitMQ server details,
such as host, port, username, password, and event topic.

To use the RabbitMQ sink, add the following flag to your command line or configuration file:

* `host`: RabbitMQ server host address. (e.g., localhost)
* `port`: RabbitMQ server port. (e.g., 5672)
* `username`: RabbitMQ username.
* `password`: RabbitMQ password.
* `eventtopic`: RabbitMQ's topic for events.

For example,

--sink=rabbitmq:?host=localhost&port=5672&username=username&password=password&eventtopic=testtopic
or
--sink=rabbitmq:?host=localhost&port=5672&username=username&password=password&metricsTopic=testtopic
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ require (
github.com/olivere/elastic/v7 v7.0.6
github.com/pborman/uuid v1.2.0
github.com/prometheus/client_golang v1.11.1
github.com/rabbitmq/amqp091-go v1.9.0 // indirect
github.com/riemann/riemann-go-client v0.4.0
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9
github.com/smartystreets/gunit v1.0.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.8.0
go.mongodb.org/mongo-driver v1.5.1
golang.org/x/sys v0.2.0 // indirect
gopkg.in/olivere/elastic.v3 v3.0.75
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/riemann/riemann-go-client v0.4.0 h1:F6NvNnB9v5aBGPXV8/uFiyUfnDv886p+vkwccx2ryV4=
Expand Down Expand Up @@ -311,13 +313,17 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
Expand All @@ -336,6 +342,7 @@ go.mongodb.org/mongo-driver v1.5.1 h1:9nOVLGDfOaZ9R0tBumx/BcuqkbFpyTCU2r/Po7A2az
go.mongodb.org/mongo-driver v1.5.1/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw=
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down Expand Up @@ -481,6 +488,8 @@ gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
3 changes: 3 additions & 0 deletions sinks/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/AliyunContainerService/kube-eventer/sinks/log"
"github.com/AliyunContainerService/kube-eventer/sinks/mongo"
"github.com/AliyunContainerService/kube-eventer/sinks/mysql"
"github.com/AliyunContainerService/kube-eventer/sinks/rabbitmq"
"github.com/AliyunContainerService/kube-eventer/sinks/riemann"
"github.com/AliyunContainerService/kube-eventer/sinks/sls"
"github.com/AliyunContainerService/kube-eventer/sinks/webhook"
Expand All @@ -49,6 +50,8 @@ func (this *SinkFactory) Build(uri flags.Uri) (core.EventSink, error) {
return elasticsearch.NewElasticSearchSink(&uri.Val)
case "kafka":
return kafka.NewKafkaSink(&uri.Val)
case "rabbitmq":
return rabbitmq.NewRabbitmqSink(&uri.Val)
case "riemann":
return riemann.CreateRiemannSink(&uri.Val)
case "honeycomb":
Expand Down
Loading