Skip to content

Commit

Permalink
feat: implements delay feature to SendMessage() API
Browse files Browse the repository at this point in the history
  • Loading branch information
vvatanabe committed Dec 5, 2023
1 parent 434ea87 commit e32d814
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 7 deletions.
9 changes: 7 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"sort"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
Expand Down Expand Up @@ -142,8 +143,9 @@ type client[T any] struct {
}

type SendMessageInput[T any] struct {
ID string
Data T
ID string
Data T
DelaySeconds int
}

// SendMessageOutput represents the result for the SendMessage() API call.
Expand All @@ -167,6 +169,9 @@ func (c *client[T]) SendMessage(ctx context.Context, params *SendMessageInput[T]
}
now := c.clock.Now()
message := NewMessage(params.ID, params.Data, now)
if params.DelaySeconds > 0 {
message.delayToAddQueueTimestamp(time.Duration(params.DelaySeconds) * time.Second)
}
err = c.put(ctx, message)
if err != nil {
return &SendMessageOutput[T]{}, err
Expand Down
25 changes: 25 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,31 @@ func TestDynamoMQClientSendMessage(t *testing.T) {
}(),
},
},
{
name: "should delay add to queue when delay is set",
setup: NewSetupFunc(),
sdkClock: mock.Clock{
T: test.DefaultTestDate,
},
args: &dynamomq.SendMessageInput[test.MessageData]{
ID: "A-101",
Data: test.NewMessageData("A-101"),
DelaySeconds: 10,
},
want: &dynamomq.SendMessageOutput[test.MessageData]{
Result: &dynamomq.Result{
ID: "A-101",
Status: dynamomq.StatusReady,
LastUpdatedTimestamp: clock.FormatRFC3339Nano(test.DefaultTestDate),
Version: 1,
},
Message: func() *dynamomq.Message[test.MessageData] {
s := NewTestMessageItemAsReady("A-101", test.DefaultTestDate)
s.AddToQueueTimestamp = clock.FormatRFC3339Nano(test.DefaultTestDate.Add(10 * time.Second))
return s
}(),
},
},
}
runTestsParallel[*dynamomq.SendMessageInput[test.MessageData], *dynamomq.SendMessageOutput[test.MessageData]](t, "SendMessage()", tests,
func(client dynamomq.Client[test.MessageData], args *dynamomq.SendMessageInput[test.MessageData]) (*dynamomq.SendMessageOutput[test.MessageData], error) {
Expand Down
5 changes: 5 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (m *Message[T]) changeVisibilityTimeout(now time.Time, visibilityTimeout in
m.VisibilityTimeout = visibilityTimeout
}

func (m *Message[T]) delayToAddQueueTimestamp(delay time.Duration) {
delayed := clock.RFC3339NanoToTime(m.AddToQueueTimestamp).Add(delay)
m.AddToQueueTimestamp = clock.FormatRFC3339Nano(delayed)
}

func (m *Message[T]) markAsProcessing(now time.Time, visibilityTimeout int) error {
status := m.GetStatus(now)
if status == StatusProcessing {
Expand Down
8 changes: 5 additions & 3 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type Producer[T any] struct {
}

type ProduceInput[T any] struct {
Data T
Data T
DelaySeconds int
}

type ProduceOutput[T any] struct {
Expand All @@ -47,8 +48,9 @@ func (c *Producer[T]) Produce(ctx context.Context, params *ProduceInput[T]) (*Pr
params = &ProduceInput[T]{}
}
out, err := c.client.SendMessage(ctx, &SendMessageInput[T]{
ID: c.idGenerator(),
Data: params.Data,
ID: c.idGenerator(),
Data: params.Data,
DelaySeconds: params.DelaySeconds,
})
if err != nil {
return &ProduceOutput[T]{}, err
Expand Down
5 changes: 3 additions & 2 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func TestProducerProduce(t *testing.T) {
c: defaultTestProducer,
args: args[test.MessageData]{
params: &dynamomq.ProduceInput[test.MessageData]{
Data: test.NewMessageData("A-101"),
Data: test.NewMessageData("A-101"),
DelaySeconds: 10,
},
},
want: &dynamomq.ProduceOutput[test.MessageData]{
Expand All @@ -67,7 +68,7 @@ func TestProducerProduce(t *testing.T) {
wantErr: false,
},
{
name: "should re to produce a message when params is nil",
name: "should fail to produce a message when client.SendMessage returns error",
c: dynamomq.NewProducer[test.MessageData](&mock.Client[test.MessageData]{
SendMessageFunc: func(ctx context.Context,
params *dynamomq.SendMessageInput[test.MessageData]) (*dynamomq.SendMessageOutput[test.MessageData], error) {
Expand Down

0 comments on commit e32d814

Please sign in to comment.