-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
64 lines (53 loc) · 1.04 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main
import (
"fmt"
"os"
"time"
"github.com/joho/godotenv"
)
func sleep() {
var forever chan struct{}
<-forever
}
func main() {
godotenv.Load()
kafkaPort := os.Getenv("KAFKA_PORT")
kafkaHost := os.Getenv("KAFKA_HOST")
fmt.Println("Connecting to Kafka...")
fmt.Printf("Host: %s\n", kafkaHost)
fmt.Printf("Port: %s\n", kafkaPort)
config := KConfig{
host: kafkaHost,
port: kafkaPort,
deadline: 10,
topic: "log-streaming",
}
kf := Kf{}
kf.Init(config)
defer kf.Close()
reader := kf.Reader(0)
writer := kf.Writer()
go func() {
reader.Read(func(key string, msg []byte) {
fmt.Println("received ->", key, ":", string(msg))
})
}()
everyFiveSeconds := time.NewTicker(2 * time.Second)
defer everyFiveSeconds.Stop()
go func() {
i := 0
for {
select {
case <-everyFiveSeconds.C:
body := "HEYOO! person number " + fmt.Sprintf("%d", i)
println("Writing to Kafka: " + body)
writer.Write(fmt.Sprintf("%d", i), []byte(body))
i++
if i == 10 {
i = 0
}
}
}
}()
sleep()
}