Skip to content

Commit

Permalink
feat: support the namespace offloadThresholdInSeconds API in pulsarad…
Browse files Browse the repository at this point in the history
…min pkg (#1271)

* feat: support offloadThresholdInSeconds API in pulsaradmin pkg

Signed-off-by: ericsyh <[email protected]>

* Update pulsaradmin/pkg/admin/namespace.go

Co-authored-by: Zike Yang <[email protected]>

* Update pulsaradmin/pkg/admin/namespace.go

Co-authored-by: Zike Yang <[email protected]>

* add test

Signed-off-by: ericsyh <[email protected]>

* fix test

Signed-off-by: ericsyh <[email protected]>

---------

Signed-off-by: ericsyh <[email protected]>
Co-authored-by: Zike Yang <[email protected]>
  • Loading branch information
ericsyh and RobertIndie authored Aug 27, 2024
1 parent 01e32e7 commit 953d9ea
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pulsaradmin/pkg/admin/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ type Namespaces interface {
// GetOffloadThreshold returns the offloadThreshold for a namespace
GetOffloadThreshold(namespace utils.NameSpaceName) (int64, error)

// SetOffloadThresholdInSeconds sets the offloadThresholdInSeconds for a namespace
SetOffloadThresholdInSeconds(namespace utils.NameSpaceName, threshold int64) error

// GetOffloadThresholdInSeconds returns the offloadThresholdInSeconds for a namespace
GetOffloadThresholdInSeconds(namespace utils.NameSpaceName) (int64, error)

// SetCompactionThreshold sets the compactionThreshold for a namespace
SetCompactionThreshold(namespace utils.NameSpaceName, threshold int64) error

Expand Down Expand Up @@ -551,6 +557,18 @@ func (n *namespaces) GetOffloadThreshold(namespace utils.NameSpaceName) (int64,
return result, err
}

func (n *namespaces) SetOffloadThresholdInSeconds(namespace utils.NameSpaceName, threshold int64) error {
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThresholdInSeconds")
return n.pulsar.Client.Put(endpoint, threshold)
}

func (n *namespaces) GetOffloadThresholdInSeconds(namespace utils.NameSpaceName) (int64, error) {
var result int64
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThresholdInSeconds")
err := n.pulsar.Client.Get(endpoint, &result)
return result, err
}

func (n *namespaces) SetMaxConsumersPerTopic(namespace utils.NameSpaceName, max int) error {
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxConsumersPerTopic")
return n.pulsar.Client.Post(endpoint, max)
Expand Down
64 changes: 64 additions & 0 deletions pulsaradmin/pkg/admin/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,67 @@ func TestNamespaces_GetSubscriptionExpirationTime(t *testing.T) {
expected = -1
assert.Equal(t, expected, subscriptionExpirationTime)
}

func TestNamespaces_SetOffloadThresholdInSeconds(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

tests := []struct {
name string
namespace string
threshold int64
errReason string
}{
{
name: "Set valid offloadThresholdInSecond",
namespace: "public/default",
threshold: 60,
errReason: "",
},
{
name: "Set invalid offloadThresholdInSecond",
namespace: "public/default",
threshold: -60,
errReason: "Invalid value for offloadThresholdInSecond",
},
{
name: "Set valid offloadThresholdInSecond: 0",
namespace: "public/default",
threshold: 0,
errReason: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
namespace, _ := utils.GetNamespaceName(tt.namespace)
err := admin.Namespaces().SetOffloadThresholdInSeconds(*namespace, tt.threshold)
if tt.errReason == "" {
assert.Equal(t, nil, err)
}
if err != nil {
restError := err.(rest.Error)
assert.Equal(t, tt.errReason, restError.Reason)
}
})
}
}

func TestNamespaces_GetOffloadThresholdInSeconds(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespace, _ := utils.GetNamespaceName("public/default")

// set the subscription expiration time and get it
err = admin.Namespaces().SetOffloadThresholdInSeconds(*namespace,
60)
assert.Equal(t, nil, err)
offloadThresholdInSeconds, err := admin.Namespaces().GetOffloadThresholdInSeconds(*namespace)
assert.Equal(t, nil, err)
expected := int64(60)
assert.Equal(t, expected, offloadThresholdInSeconds)
}

0 comments on commit 953d9ea

Please sign in to comment.