Skip to content

Commit

Permalink
Merge pull request #30 from aliyun/evan_branch
Browse files Browse the repository at this point in the history
add go sdk producer
  • Loading branch information
xuxiaoahang2018 authored May 14, 2019
2 parents 22cda97 + fd23a0d commit 8de8107
Show file tree
Hide file tree
Showing 23 changed files with 1,782 additions and 4 deletions.
6 changes: 3 additions & 3 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.Lo
slsError, ok := err.(sls.Error)
if ok {
if slsError.HTTPCode == 403 {
level.Info(consumer.logger).Log("msg", "shard Get checkpoint gets errors, starts to try again", "shard", shardId, "error", slsError)
level.Warn(consumer.logger).Log("msg", "shard Get checkpoint gets errors, starts to try again", "shard", shardId, "error", slsError)
time.Sleep(5 * time.Second)
} else {
level.Info(consumer.logger).Log("msg", "shard Get checkpoint gets errors, starts to try again", "shard", shardId, "error", slsError)
level.Warn(consumer.logger).Log("msg", "shard Get checkpoint gets errors, starts to try again", "shard", shardId, "error", slsError)
time.Sleep(200 * time.Millisecond)
}
} else {
level.Info(consumer.logger).Log("msg", "unknown error when pull log", "shardId", shardId, "cursor", cursor, "error", err)
level.Warn(consumer.logger).Log("msg", "unknown error when pull log", "shardId", shardId, "cursor", cursor, "error", err)
}
} else {
return gl, nextCursor, nil
Expand Down
2 changes: 1 addition & 1 deletion consumer/shard_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (consumer *ShardConsumerWorker) consume() {
if consumer.getConsumerStatus() == PULL_PROCESSING_DONE {
consumer.consumerCheckPointTracker.tempCheckPoint = consumer.tempCheckPoint
} else if consumer.getConsumerStatus() == CONSUME_PROCESSING {
level.Debug(consumer.logger).Log("msg", "Consumption is in progress, waiting for consumption to be completed")
level.Info(consumer.logger).Log("msg", "Consumption is in progress, waiting for consumption to be completed")
consumer.setIsFlushCheckpointDoneToTrue()
return
}
Expand Down
54 changes: 54 additions & 0 deletions example/producer/native_pb_demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package main

import (
"fmt"
"github.com/aliyun/aliyun-log-go-sdk"
"github.com/aliyun/aliyun-log-go-sdk/producer"
"github.com/gogo/protobuf/proto"
"os"
"os/signal"
"sync"
"time"
)

func main() {
producerConfig := producer.GetDefaultProducerConfig()
producerConfig.Endpoint = os.Getenv("Endpoint")
producerConfig.AccessKeyID = os.Getenv("AccessKeyID")
producerConfig.AccessKeySecret = os.Getenv("AccessKeySecret")
producerInstance := producer.InitProducer(producerConfig)
ch := make(chan os.Signal)
signal.Notify(ch)
producerInstance.Start()
var m sync.WaitGroup
for i := 0; i < 10; i++ {
m.Add(1)
go func() {
defer m.Done()
for i := 0; i < 1000; i++ {
// GenerateLog is producer's function for generating SLS format logs
// GenerateLog has low performance, and native Log interface is the best choice for high performance.
content := []*sls.LogContent{}
content = append(content, &sls.LogContent{
Key: proto.String("pb_test"),
Value: proto.String("pb_value"),
})
log := &sls.Log{
Time: proto.Uint32(uint32(time.Now().Unix())),
Contents: content,
}

err := producerInstance.SendLog("project", "logstrore", "127.0.0.1", "topic", log)
if err != nil {
fmt.Println(err)
}
}
}()
}
m.Wait()
fmt.Println("Send completion")
if _, ok := <-ch; ok {
fmt.Println("Get the shutdown signal and start to shut down")
producerInstance.Close(60)
}
}
102 changes: 102 additions & 0 deletions example/producer/performance_test_demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package main

import (
"fmt"
"github.com/aliyun/aliyun-log-go-sdk"
"github.com/aliyun/aliyun-log-go-sdk/producer"
"github.com/gogo/protobuf/proto"
"math/rand"
"os"
"os/signal"
"runtime"
"time"
)

var valueList [][]*string

func main() {
runtime.GOMAXPROCS(2)
producerConfig := producer.GetDefaultProducerConfig()
producerConfig.MaxBatchCount = 40960
producerConfig.MaxBatchSize = 3 * 1024 * 1024
producerConfig.Endpoint = os.Getenv("Endpoint")
producerConfig.AccessKeyID = os.Getenv("AccessKeyID")
producerConfig.AccessKeySecret = os.Getenv("AccessKeySecret")
keys := getKeys()
rand.Seed(time.Now().Unix())
valueList = generateValuseList()

producerInstance := producer.InitProducer(producerConfig)
ch := make(chan os.Signal)
signal.Notify(ch)
producerInstance.Start()
fmt.Println("start send logs")
for i := 0; i < 10; i++ {
go func() {
for i := 0; i < 200000000; i++ {
r := rand.Intn(200000000)
err := producerInstance.SendLog("project", "logstore", generateTopic(r), generateSource(r), getLog(keys))
if err != nil {
fmt.Println(err)
break
}
}
fmt.Println("All data in the queue has been sent, groutine id:", i)
}()
}
if _, ok := <-ch; ok {
fmt.Println("Get the shutdown signal and start to shut down")
producerInstance.SafeClose()
}

}

func generateTopic(r int) string {
return fmt.Sprintf("topic-%v", r%5)
}

func generateSource(r int) string {
return fmt.Sprintf("source-%v", r%10)
}

func getKeys() (keys []*string) {
for i := 1; i < 9; i++ {
key := proto.String(fmt.Sprintf("content_key_%v", i))
keys = append(keys, key)
}
return keys
}

func getValues() (values []*string) {
r := rand.Intn(20000000)
for i := 1; i < 9; i++ {
value := proto.String(fmt.Sprintf("%vabcdefghijklmnopqrstuvwxyz0123456789!@#$^&*()_012345678-%v", i, r))
values = append(values, value)
}
return values
}

func getLog(keys []*string) *sls.Log {
contents := []*sls.LogContent{}
r := rand.Intn(4096)
for i := 0; i < 8; i++ {
content := &sls.LogContent{
Key: keys[i],
Value: valueList[r][i],
}
contents = append(contents, content)
}
log := &sls.Log{
Time: proto.Uint32(uint32(time.Now().Unix())),
Contents: contents,
}
return log
}

func generateValuseList() [][]*string {
for i := 0; i < 4097; i++ {
v := getValues()
valueList = append(valueList, v)
}
return valueList
}
43 changes: 43 additions & 0 deletions example/producer/producer_simple_demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"fmt"
"github.com/aliyun/aliyun-log-go-sdk/producer"
"os"
"os/signal"
"sync"
"time"
)

func main() {
producerConfig := producer.GetDefaultProducerConfig()
producerConfig.Endpoint = os.Getenv("Endpoint")
producerConfig.AccessKeyID = os.Getenv("AccessKeyID")
producerConfig.AccessKeySecret = os.Getenv("AccessKeySecret")
producerInstance := producer.InitProducer(producerConfig)
ch := make(chan os.Signal)
signal.Notify(ch)
producerInstance.Start()
var m sync.WaitGroup
for i := 0; i < 10; i++ {
m.Add(1)
go func() {
defer m.Done()
for i := 0; i < 1000; i++ {
// GenerateLog is producer's function for generating SLS format logs
// GenerateLog has low performance, and native Log interface is the best choice for high performance.
log := producer.GenerateLog(uint32(time.Now().Unix()), map[string]string{"content": "test", "content2": fmt.Sprintf("%v", i)})
err := producerInstance.SendLog("project", "logstrore", "127.0.0.1", "topic", log)
if err != nil {
fmt.Println(err)
}
}
}()
}
m.Wait()
fmt.Println("Send completion")
if _, ok := <-ch; ok {
fmt.Println("Get the shutdown signal and start to shut down")
producerInstance.Close(60)
}
}
63 changes: 63 additions & 0 deletions example/producer/simple_callback_demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"fmt"
"github.com/aliyun/aliyun-log-go-sdk/producer"
"os"
"os/signal"
"sync"
"time"
)

type Callback struct {
}

func (callback *Callback) Success(result *producer.Result) {
attemptList := result.GetReservedAttempts()
for _, attempt := range attemptList {
fmt.Println(attempt)
}
}

func (callback *Callback) Fail(result *producer.Result) {
fmt.Println(result.IsSuccessful())
fmt.Println(result.GetErrorCode())
fmt.Println(result.GetErrorMessage())
fmt.Println(result.GetReservedAttempts())
fmt.Println(result.GetRequestId())
fmt.Println(result.GetTimeStampMs())
}

func main() {
producerConfig := producer.GetDefaultProducerConfig()
producerConfig.Endpoint = os.Getenv("Endpoint")
producerConfig.AccessKeyID = os.Getenv("AccessKeyID")
producerConfig.AccessKeySecret = os.Getenv("AccessKeySecret")
producerInstance := producer.InitProducer(producerConfig)
ch := make(chan os.Signal)
signal.Notify(ch)
producerInstance.Start()
var m sync.WaitGroup
callBack := &Callback{}
for i := 0; i < 10; i++ {
m.Add(1)
go func() {
defer m.Done()
for i := 0; i < 1000; i++ {
// GenerateLog is producer's function for generating SLS format logs
// GenerateLog has low performance, and native Log interface is the best choice for high performance.
log := producer.GenerateLog(uint32(time.Now().Unix()), map[string]string{"content": "test", "content2": fmt.Sprintf("%v", i)})
err := producerInstance.SendLogWithCallBack("project", "logstrore", "127.0.0.1", "topic", log, callBack)
if err != nil {
fmt.Println(err)
}
}
}()
}
m.Wait()
fmt.Println("Send completion")
if _, ok := <-ch; ok {
fmt.Println("Get the shutdown signal and start to shut down")
producerInstance.Close(60)
}
}
86 changes: 86 additions & 0 deletions producer/PERFORMANCE_TEST.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
## 测试环境:

### ECS虚拟机

实例环境: ecs.c5.xlarge
cpu: 4core
内存: 8Gib
操作系统: CentOS 7.6 64位

## GOLANG 版本:

go version go1.12.4 linux/amd64

## 日志样例

测试中使用的日志包含 8 个键值对以及 topic、source 两个字段。为了模拟数据的随机性,我们给每个字段值追加了一个随机后缀。其中,topic 后缀取值范围是 [0, 5),source 后缀取值范围是 [0, 10),其余 8 个键值对后缀取值范围是 [0, 单线程发送次数)。单条日志大小约为 550 字节,格式如下:

```
__topic__: topic-<suffix>
__source__: source-<suffix>
content_key_1: 1abcdefghijklmnopqrstuvwxyz!@#$%^&*()_0123456789-<suffix>
content_key_2: 2abcdefghijklmnopqrstuvwxyz!@#$%^&*()_0123456789-<suffix>
content_key_3: 3abcdefghijklmnopqrstuvwxyz!@#$%^&*()_0123456789-<suffix>
content_key_4: 4abcdefghijklmnopqrstuvwxyz!@#$%^&*()_0123456789-<suffix>
content_key_5: 5abcdefghijklmnopqrstuvwxyz!@#$%^&*()_0123456789-<suffix>
content_key_6: 6abcdefghijklmnopqrstuvwxyz!@#$%^&*()_0123456789-<suffix>
content_key_7: 7abcdefghijklmnopqrstuvwxyz!@#$%^&*()_0123456789-<suffix>
content_key_8: 8abcdefghijklmnopqrstuvwxyz!@#$%^&*()_0123456789-<suffix>
```

### Project & Logstore

- Project:在 ECS 所在 region 创建目标 project 并通过 VPC 网络服务入口进行访问。
- Logstore:在该 project 下创建一个分区数为 10 的 logstore(未开启索引),该 logstore 的写入流量最大为 50 MB/s,参阅[数据读写](https://help.aliyun.com/document_detail/92571.html)

## 测试用例

### 测试程序说明

- ProducerConfig.totalSizeInBytes: 具体用例中调整
- ProducerConfig.maxBatchSizeInBytes: 3 * 1024 * 1024
- ProducerConfig.maxBatchCount:40960
- ProducerConfig.lingerMs:2000
- 调用`Producer.send()`方法的groutine数量:10
- 每个线程发送日志条数:20,000,000
- 发送日志总大小:约 115 GB
- 客户端压缩后大小:约 12 GB
- 发送日志总条数:200,000,000

### 调整使用cpu数量

将 ProducerConfig.totalSizeInBytes 设置为默认值 104,857,600(即 100 MB),设置发送任务使用的groutine数量为默认值50个,通过在程序开始时设置程序使用cpu核心数:runtime.GOMAXPROCS(1),来观察程序性能。

| cpu数量 | 线程池groutine数量 | 原始数据吞吐量 | 压缩后数据吞吐量 | cpu使用率 | 说明 |
| ------- | ------------------ | -------------- | ---------------- | --------- | --------------------------- |
| 1 | 50 | 73.386Mb/s | 7.099Md/s | 24% | 未达到10个shard写入能力上限 |
| 2 | 50 | 136.533Mb/s | 13.141Md/s | 50% | 未达到10个shard写入能力上限 |
| 4 | 50 | 163.84Mb/s | 15.701Md/s | 84% | 未达到10个shard写入能力上限 |



## 调整MaxIoWorkerCount



| cpu数量 | 线程池groutine数量 | 原始数据吞吐量 | 压缩后数据吞吐量 | cpu使用率 | 说明 |
| ------- | ------------------ | -------------- | ---------------- | --------- | --------------------------- |
| 1 | 100 | 69.97Mb/s | 6.656Mb/s | 23% | 未达到10个shard写入能力上限 |
| 2 | 100 | 131.41Mb/s | 12.62Mb/s | 49% | 未达到10个shard写入能力上限 |
| 4 | 100 | 162.133Mb/s | 15.701Mb/s | 84% | 未达到10个shard写入能力上限 |

## 调整totalSizeInBytes

将使用ProducerConfig.ioThreadCount设置为2 (注意这个配置为当前使用cpu核数量),通过调整 ProducerConfig.totalSizeInBytes 观察程序性能。

| TotalSizeInBytes | 线程池groutine数量 | 原始数据吞吐量 | 压缩后数据吞吐量 | cpu使用率 | 说明 |
| ---------------- | ------------------ | -------------- | ---------------- | --------- | --------------------------- |
| 52,428,800 | 50 | 34.133Mb/s | 3.3792Mb/s | 15% | 未达到10个shard写入能力上限 |
| 209,715,200 | 50 | 136.53Mb/s | 13.141Mb/s | 49% | 未达到10个shard写入能力上限 |
| 419,430,400 | 50 | 133.12Mb/s | 12.8Mb/s | 50% | 未达到10个shard写入能力上限 |

## 总结

1. 增加程序使用cpu数目可以显著提高吞吐量。
2. 在使用cpu数目不变的情况下,增加线程池groutine的数量并不一定会带来性能的提升,相反多开的groutine会提高程序gc的cpu使用率,所以线程池开启groutine的数量应该按照机器的性能去调整一个合适的数值,建议可以使用默认值。
3. 在cpu数目和线程池groutine数目不变的情况下,调整 totalSizeInBytes 对吞吐量影响不够显著,增加 totalSizeInBytes 会造成更多的 CPU 消耗,建议使用默认值。
Loading

0 comments on commit 8de8107

Please sign in to comment.