Skip to content

Commit

Permalink
refactor(examples.confluent): adapt to the new api
Browse files Browse the repository at this point in the history
  • Loading branch information
alebabai committed May 9, 2024
1 parent 4a68953 commit 992bff8
Show file tree
Hide file tree
Showing 10 changed files with 448 additions and 352 deletions.
90 changes: 29 additions & 61 deletions examples/confluent/cmd/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,16 @@ import (
"os/signal"
"syscall"

kafkatransport "github.com/alebabai/go-kit-kafka/v2/transport"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/go-kit/kit/transport"
"github.com/alebabai/go-kafka"
adapter "github.com/alebabai/go-kafka/adapter/confluent"
ckafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/alebabai/go-kit-kafka/v2/examples/common"
"github.com/alebabai/go-kit-kafka/v2/examples/common/consumer"
"github.com/alebabai/go-kit-kafka/v2/examples/common/domain"

"github.com/alebabai/go-kit-kafka/v2/examples/confluent/pkg/consumer/kafka/adapter"
)

func fatal(logger log.Logger, err error) {
_ = level.Error(logger).Log("err", err)
os.Exit(1)
}

func main() {
var (
ctx context.Context
Expand All @@ -46,46 +39,26 @@ func main() {

_ = logger.Log("msg", "initialization of the application")

_ = logger.Log("msg", "initialize services")

var svc consumer.Service
{
var err error
svc, err = consumer.NewStorageService(
log.With(logger, "component", "storage-service"),
)
if err != nil {
fatal(logger, fmt.Errorf("failed to init storage: %w", err))
}
}

_ = logger.Log("msg", "initialize endpoints")

var endpoints consumer.Endpoints
{
endpoints = consumer.Endpoints{
CreateEventEndpoint: consumer.MakeCreateEventEndpoint(svc),
ListEventsEndpoint: consumer.MakeListEventsEndpoint(svc),
}
}

_ = logger.Log("msg", "initialize kafka handlers")
_ = logger.Log("msg", "initializing services")

kafkaHandler := consumer.NewKafkaHandler(endpoints.CreateEventEndpoint)
svc := consumer.NewService(
log.With(logger, "component", "consumer-service"),
)

_ = logger.Log("msg", "initialize kafka consumer")

var kafkaListener *adapter.Listener
var consumerListener *adapter.ConsumerListener
{
brokerAddr := domain.BrokerAddr
brokerAddr := common.BrokerAddr
if v, ok := os.LookupEnv("BROKER_ADDR"); ok {
brokerAddr = v
}

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": brokerAddr,
"group.id": domain.GroupID,
"enable.auto.commit": true,
c, err := ckafka.NewConsumer(&ckafka.ConfigMap{
"bootstrap.servers": brokerAddr,
"group.id": consumer.KafkaGroupID,
"enable.auto.commit": true,
"allow.auto.create.topics": true,
})
if err != nil {
fatal(logger, fmt.Errorf("failed to init kafka consumer: %w", err))
Expand All @@ -98,28 +71,18 @@ func main() {
}()

// use a router in case if there are many topics
router := make(kafkatransport.Router)
router.AddHandler(domain.Topic, kafkaHandler)

topics := make([]string, 0)
for topic := range router {
topics = append(topics, topic)
router := kafka.StaticRouterByTopic{
common.KafkaTopic: consumer.NewKafkaHandler(
consumer.MakeCreateEventEndpoint(svc),
),
}

if err := c.SubscribeTopics(topics, nil); err != nil {
if err := c.SubscribeTopics([]string{common.KafkaTopic}, nil); err != nil {
fatal(logger, fmt.Errorf("failed to subscribe to topics: %w", err))
}

kafkaListener, err = adapter.NewListener(
consumerListener, err = adapter.NewConsumerListener(
c,
router,
adapter.ListenerErrorHandler(
transport.NewLogErrorHandler(
level.Error(
log.With(logger, "component", "listener"),
),
),
),
)
if err != nil {
fatal(logger, fmt.Errorf("failed to init kafka listener: %w", err))
Expand All @@ -131,8 +94,8 @@ func main() {
var httpServer *http.Server
{
httpServer = &http.Server{
Addr: ":8081",
Handler: consumer.NewHTTPHandler(endpoints),
Addr: consumer.HTTPServerAddr,
Handler: consumer.NewHTTPHandler(consumer.MakeListEventsEndpoint(svc)),
}

defer func() {
Expand All @@ -145,7 +108,7 @@ func main() {
errc := make(chan error, 1)

go func() {
if err := kafkaListener.Listen(ctx); err != nil {
if err := consumerListener.Listen(ctx, -1); err != nil {
errc <- err
}
}()
Expand All @@ -165,3 +128,8 @@ func main() {
_ = logger.Log("msg", "application started")
_ = logger.Log("msg", "application stopped", "exit", <-errc)
}

func fatal(logger log.Logger, err error) {
_ = level.Error(logger).Log("err", err)
os.Exit(1)
}
54 changes: 21 additions & 33 deletions examples/confluent/cmd/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,12 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/alebabai/go-kit-kafka/v2/examples/common/domain"
"github.com/alebabai/go-kit-kafka/v2/examples/common"
"github.com/alebabai/go-kit-kafka/v2/examples/common/producer"

"github.com/alebabai/go-kit-kafka/v2/examples/confluent/pkg/producer/kafka/adapter"
"github.com/alebabai/go-kit-kafka/v2/examples/confluent/pkg/kafka/adapter"
)

func fatal(logger log.Logger, err error) {
_ = logger.Log("err", err)
os.Exit(1)
}

func main() {
var (
ctx context.Context
Expand All @@ -49,22 +44,17 @@ func main() {

_ = logger.Log("msg", "initialization of the application")

_ = logger.Log("msg", "initialize services")
_ = logger.Log("msg", "initializing services")

var svc producer.Service
{
var err error
svc, err = producer.NewGeneratorService(logger)
if err != nil {
fatal(logger, fmt.Errorf("failed to create generator: %w", err))
}
}
svc := producer.NewService(
log.With(logger, "component", "producer-service"),
)

_ = logger.Log("msg", "initialize kafka producer")
_ = logger.Log("msg", "initializing kafka producer")

var producerMiddleware endpoint.Middleware
{
brokerAddr := domain.BrokerAddr
brokerAddr := common.BrokerAddr
if v, ok := os.LookupEnv("BROKER_ADDR"); ok {
brokerAddr = v
}
Expand All @@ -80,30 +70,23 @@ func main() {

e := producer.NewKafkaProducer(
adapter.NewProducer(p),
domain.Topic,
common.KafkaTopic,
).Endpoint()

producerMiddleware = producer.Middleware(e)
}

_ = logger.Log("msg", "initialize endpoints")

var endpoints producer.Endpoints
{
endpoints = producer.Endpoints{
GenerateEvent: producerMiddleware(
producer.MakeGenerateEventEndpoint(svc),
),
}
}

_ = logger.Log("msg", "initialize http server")
_ = logger.Log("msg", "initializing http server")

var httpServer *http.Server
{
httpServer = &http.Server{
Addr: ":8080",
Handler: producer.NewHTTPHandler(endpoints),
Addr: producer.HTTPServerAddr,
Handler: producer.NewHTTPHandler(
producerMiddleware(
producer.MakeGenerateEventEndpoint(svc),
),
),
}

defer func() {
Expand All @@ -130,3 +113,8 @@ func main() {
_ = logger.Log("msg", "application started")
_ = logger.Log("msg", "application stopped", "exit", <-errc)
}

func fatal(logger log.Logger, err error) {
_ = logger.Log("err", err)
os.Exit(1)
}
16 changes: 8 additions & 8 deletions examples/confluent/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,30 @@ services:
- 9092
- 9094:9094

producer:
image: library/golang:1.16
restart: on-failure
consumer:
image: library/golang:1.22
restart: always
depends_on:
- kafka
environment:
BROKER_ADDR: kafka:9092
volumes:
- ../..:/app
working_dir: /app
command: sh -c "cd ./examples/confluent && go mod download && go run ./cmd/producer"
command: sh -c "cd ./examples/confluent && go mod download && go run ./cmd/consumer"
ports:
- 8080:8080

consumer:
image: library/golang:1.16
restart: on-failure
producer:
image: library/golang:1.22
restart: always
depends_on:
- kafka
environment:
BROKER_ADDR: kafka:9092
volumes:
- ../..:/app
working_dir: /app
command: sh -c "cd ./examples/confluent && go mod download && go run ./cmd/consumer"
command: sh -c "cd ./examples/confluent && go mod download && go run ./cmd/producer"
ports:
- 8081:8081
8 changes: 4 additions & 4 deletions examples/confluent/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ go 1.22

require (
github.com/alebabai/go-kafka v0.3.2
github.com/alebabai/go-kit-kafka/v2 v2.0.0
github.com/alebabai/go-kafka/adapter/confluent v0.3.2
github.com/alebabai/go-kit-kafka/v2/examples/common v0.0.0
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/confluentinc/confluent-kafka-go/v2 v2.4.0
github.com/go-kit/kit v0.13.0
github.com/go-kit/log v0.2.1
)

require (
github.com/alebabai/go-kit-kafka/v2 v2.0.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/google/uuid v1.6.0 // indirect
)

replace github.com/alebabai/go-kit-kafka/v2 => ../..
Expand Down
Loading

0 comments on commit 992bff8

Please sign in to comment.