From 953d9eab07948d94234c6a2e6b04f1ffeb8ff833 Mon Sep 17 00:00:00 2001 From: Eric Shen Date: Tue, 27 Aug 2024 14:32:09 +0800 Subject: [PATCH] feat: support the namespace offloadThresholdInSeconds API in pulsaradmin pkg (#1271) * feat: support offloadThresholdInSeconds API in pulsaradmin pkg Signed-off-by: ericsyh * Update pulsaradmin/pkg/admin/namespace.go Co-authored-by: Zike Yang * Update pulsaradmin/pkg/admin/namespace.go Co-authored-by: Zike Yang * add test Signed-off-by: ericsyh * fix test Signed-off-by: ericsyh --------- Signed-off-by: ericsyh Co-authored-by: Zike Yang --- pulsaradmin/pkg/admin/namespace.go | 18 +++++++ pulsaradmin/pkg/admin/namespace_test.go | 64 +++++++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/pulsaradmin/pkg/admin/namespace.go b/pulsaradmin/pkg/admin/namespace.go index 57c0297a26..9c3f3e3fef 100644 --- a/pulsaradmin/pkg/admin/namespace.go +++ b/pulsaradmin/pkg/admin/namespace.go @@ -114,6 +114,12 @@ type Namespaces interface { // GetOffloadThreshold returns the offloadThreshold for a namespace GetOffloadThreshold(namespace utils.NameSpaceName) (int64, error) + // SetOffloadThresholdInSeconds sets the offloadThresholdInSeconds for a namespace + SetOffloadThresholdInSeconds(namespace utils.NameSpaceName, threshold int64) error + + // GetOffloadThresholdInSeconds returns the offloadThresholdInSeconds for a namespace + GetOffloadThresholdInSeconds(namespace utils.NameSpaceName) (int64, error) + // SetCompactionThreshold sets the compactionThreshold for a namespace SetCompactionThreshold(namespace utils.NameSpaceName, threshold int64) error @@ -551,6 +557,18 @@ func (n *namespaces) GetOffloadThreshold(namespace utils.NameSpaceName) (int64, return result, err } +func (n *namespaces) SetOffloadThresholdInSeconds(namespace utils.NameSpaceName, threshold int64) error { + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThresholdInSeconds") + return n.pulsar.Client.Put(endpoint, threshold) +} + +func (n *namespaces) GetOffloadThresholdInSeconds(namespace utils.NameSpaceName) (int64, error) { + var result int64 + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThresholdInSeconds") + err := n.pulsar.Client.Get(endpoint, &result) + return result, err +} + func (n *namespaces) SetMaxConsumersPerTopic(namespace utils.NameSpaceName, max int) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxConsumersPerTopic") return n.pulsar.Client.Post(endpoint, max) diff --git a/pulsaradmin/pkg/admin/namespace_test.go b/pulsaradmin/pkg/admin/namespace_test.go index 0b4000ecdb..8fa687ff41 100644 --- a/pulsaradmin/pkg/admin/namespace_test.go +++ b/pulsaradmin/pkg/admin/namespace_test.go @@ -277,3 +277,67 @@ func TestNamespaces_GetSubscriptionExpirationTime(t *testing.T) { expected = -1 assert.Equal(t, expected, subscriptionExpirationTime) } + +func TestNamespaces_SetOffloadThresholdInSeconds(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + tests := []struct { + name string + namespace string + threshold int64 + errReason string + }{ + { + name: "Set valid offloadThresholdInSecond", + namespace: "public/default", + threshold: 60, + errReason: "", + }, + { + name: "Set invalid offloadThresholdInSecond", + namespace: "public/default", + threshold: -60, + errReason: "Invalid value for offloadThresholdInSecond", + }, + { + name: "Set valid offloadThresholdInSecond: 0", + namespace: "public/default", + threshold: 0, + errReason: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + namespace, _ := utils.GetNamespaceName(tt.namespace) + err := admin.Namespaces().SetOffloadThresholdInSeconds(*namespace, tt.threshold) + if tt.errReason == "" { + assert.Equal(t, nil, err) + } + if err != nil { + restError := err.(rest.Error) + assert.Equal(t, tt.errReason, restError.Reason) + } + }) + } +} + +func TestNamespaces_GetOffloadThresholdInSeconds(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespace, _ := utils.GetNamespaceName("public/default") + + // set the subscription expiration time and get it + err = admin.Namespaces().SetOffloadThresholdInSeconds(*namespace, + 60) + assert.Equal(t, nil, err) + offloadThresholdInSeconds, err := admin.Namespaces().GetOffloadThresholdInSeconds(*namespace) + assert.Equal(t, nil, err) + expected := int64(60) + assert.Equal(t, expected, offloadThresholdInSeconds) +}